Class: Webhookdb::SyncTarget::Routine
- Inherits:
-
Object
- Object
- Webhookdb::SyncTarget::Routine
- Defined in:
- lib/webhookdb/sync_target.rb
Direct Known Subclasses
Instance Attribute Summary collapse
- #last_synced_at ⇒ Time
-
#now ⇒ Object
readonly
Returns the value of attribute now.
-
#replicator ⇒ Object
readonly
Returns the value of attribute replicator.
-
#sync_target ⇒ Object
readonly
Returns the value of attribute sync_target.
-
#timestamp_expr ⇒ Object
readonly
Returns the value of attribute timestamp_expr.
Instance Method Summary collapse
-
#dataset_to_sync ⇒ Object
Get the dataset of rows that need to be synced.
-
#initialize(now, sync_target) ⇒ Routine
constructor
A new instance of Routine.
- #record(last_synced_at) ⇒ Object
- #run ⇒ Object
Constructor Details
#initialize(now, sync_target) ⇒ Routine
Returns a new instance of Routine.
313 314 315 316 317 318 319 |
# File 'lib/webhookdb/sync_target.rb', line 313 def initialize(now, sync_target) @now = now @sync_target = sync_target @last_synced_at = sync_target.last_synced_at @replicator = sync_target.service_integration.replicator @timestamp_expr = Sequel[@replicator..name] end |
Instance Attribute Details
#last_synced_at ⇒ Time
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 |
# File 'lib/webhookdb/sync_target.rb', line 310 class Routine attr_reader :now, :sync_target, :replicator, :timestamp_expr def initialize(now, sync_target) @now = now @sync_target = sync_target @last_synced_at = sync_target.last_synced_at @replicator = sync_target.service_integration.replicator @timestamp_expr = Sequel[@replicator..name] end def run = raise NotImplementedError # Get the dataset of rows that need to be synced. # Note that there are a couple race conditions here. # First, those in https://github.com/webhookdb/webhookdb/issues/571. # There is also the condition that we could send the same row # multiple times when the row timestamp is set to last_synced_at but # it wasn't in the last sync; however that is likely not a big problem # since clients need to handle updates in any case. def dataset_to_sync # Use admin dataset, since the client could be using all their readonly conns. @replicator.admin_dataset do |ds| # Find rows updated before we started tscond = (@timestamp_expr <= @now) # Find rows updated after the last sync was run @last_synced_at && (tscond &= (@timestamp_expr >= @last_synced_at)) ds = ds.where(tscond) # We want to paginate from oldest to newest ds = ds.order(@timestamp_expr) yield(ds) end end def record(last_synced_at) self.sync_target.update(last_synced_at:) rescue Sequel::NoExistingObject => e raise Webhookdb::SyncTarget::Deleted, e end end |
#now ⇒ Object (readonly)
Returns the value of attribute now.
311 312 313 |
# File 'lib/webhookdb/sync_target.rb', line 311 def now @now end |
#replicator ⇒ Object (readonly)
Returns the value of attribute replicator.
311 312 313 |
# File 'lib/webhookdb/sync_target.rb', line 311 def replicator @replicator end |
#sync_target ⇒ Object (readonly)
Returns the value of attribute sync_target.
311 312 313 |
# File 'lib/webhookdb/sync_target.rb', line 311 def sync_target @sync_target end |
#timestamp_expr ⇒ Object (readonly)
Returns the value of attribute timestamp_expr.
311 312 313 |
# File 'lib/webhookdb/sync_target.rb', line 311 def @timestamp_expr end |
Instance Method Details
#dataset_to_sync ⇒ Object
Get the dataset of rows that need to be synced. Note that there are a couple race conditions here. First, those in github.com/webhookdb/webhookdb/issues/571. There is also the condition that we could send the same row multiple times when the row timestamp is set to last_synced_at but it wasn’t in the last sync; however that is likely not a big problem since clients need to handle updates in any case.
330 331 332 333 334 335 336 337 338 339 340 341 342 |
# File 'lib/webhookdb/sync_target.rb', line 330 def dataset_to_sync # Use admin dataset, since the client could be using all their readonly conns. @replicator.admin_dataset do |ds| # Find rows updated before we started tscond = (@timestamp_expr <= @now) # Find rows updated after the last sync was run @last_synced_at && (tscond &= (@timestamp_expr >= @last_synced_at)) ds = ds.where(tscond) # We want to paginate from oldest to newest ds = ds.order(@timestamp_expr) yield(ds) end end |
#record(last_synced_at) ⇒ Object
344 345 346 347 348 |
# File 'lib/webhookdb/sync_target.rb', line 344 def record(last_synced_at) self.sync_target.update(last_synced_at:) rescue Sequel::NoExistingObject => e raise Webhookdb::SyncTarget::Deleted, e end |
#run ⇒ Object
321 |
# File 'lib/webhookdb/sync_target.rb', line 321 def run = raise NotImplementedError |