Class: Webhookdb::SyncTarget::Routine
- Inherits:
-
Object
- Object
- Webhookdb::SyncTarget::Routine
- Defined in:
- lib/webhookdb/sync_target.rb
Direct Known Subclasses
Instance Attribute Summary collapse
- #last_synced_at ⇒ Time
-
#now ⇒ Object
readonly
Returns the value of attribute now.
-
#replicator ⇒ Object
readonly
Returns the value of attribute replicator.
-
#sync_target ⇒ Object
readonly
Returns the value of attribute sync_target.
-
#timestamp_expr ⇒ Object
readonly
Returns the value of attribute timestamp_expr.
Instance Method Summary collapse
-
#dataset_to_sync ⇒ Object
Get the dataset of rows that need to be synced.
-
#initialize(now, sync_target) ⇒ Routine
constructor
A new instance of Routine.
- #record(last_synced_at) ⇒ Object
- #run ⇒ Object
Constructor Details
#initialize(now, sync_target) ⇒ Routine
Returns a new instance of Routine.
315 316 317 318 319 320 321 |
# File 'lib/webhookdb/sync_target.rb', line 315 def initialize(now, sync_target) @now = now @sync_target = sync_target @last_synced_at = sync_target.last_synced_at @replicator = sync_target.service_integration.replicator @timestamp_expr = Sequel[@replicator..name] end |
Instance Attribute Details
#last_synced_at ⇒ Time
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/webhookdb/sync_target.rb', line 312 class Routine attr_reader :now, :sync_target, :replicator, :timestamp_expr def initialize(now, sync_target) @now = now @sync_target = sync_target @last_synced_at = sync_target.last_synced_at @replicator = sync_target.service_integration.replicator @timestamp_expr = Sequel[@replicator..name] end def run = raise NotImplementedError # Get the dataset of rows that need to be synced. # Note that there are a couple race conditions here. # First, those in https://github.com/webhookdb/webhookdb/issues/571. # There is also the condition that we could send the same row # multiple times when the row timestamp is set to last_synced_at but # it wasn't in the last sync; however that is likely not a big problem # since clients need to handle updates in any case. def dataset_to_sync # Use admin dataset, since the client could be using all their readonly conns. @replicator.admin_dataset do |ds| # Find rows updated before we started tscond = (@timestamp_expr <= @now) # Find rows updated after the last sync was run @last_synced_at && (tscond &= (@timestamp_expr >= @last_synced_at)) ds = ds.where(tscond) # We want to paginate from oldest to newest ds = ds.order(@timestamp_expr) yield(ds) end end def record(last_synced_at) self.sync_target.update(last_synced_at:) rescue Sequel::NoExistingObject => e raise Webhookdb::SyncTarget::Deleted, e end end |
#now ⇒ Object (readonly)
Returns the value of attribute now.
313 314 315 |
# File 'lib/webhookdb/sync_target.rb', line 313 def now @now end |
#replicator ⇒ Object (readonly)
Returns the value of attribute replicator.
313 314 315 |
# File 'lib/webhookdb/sync_target.rb', line 313 def replicator @replicator end |
#sync_target ⇒ Object (readonly)
Returns the value of attribute sync_target.
313 314 315 |
# File 'lib/webhookdb/sync_target.rb', line 313 def sync_target @sync_target end |
#timestamp_expr ⇒ Object (readonly)
Returns the value of attribute timestamp_expr.
313 314 315 |
# File 'lib/webhookdb/sync_target.rb', line 313 def @timestamp_expr end |
Instance Method Details
#dataset_to_sync ⇒ Object
Get the dataset of rows that need to be synced. Note that there are a couple race conditions here. First, those in github.com/webhookdb/webhookdb/issues/571. There is also the condition that we could send the same row multiple times when the row timestamp is set to last_synced_at but it wasn’t in the last sync; however that is likely not a big problem since clients need to handle updates in any case.
332 333 334 335 336 337 338 339 340 341 342 343 344 |
# File 'lib/webhookdb/sync_target.rb', line 332 def dataset_to_sync # Use admin dataset, since the client could be using all their readonly conns. @replicator.admin_dataset do |ds| # Find rows updated before we started tscond = (@timestamp_expr <= @now) # Find rows updated after the last sync was run @last_synced_at && (tscond &= (@timestamp_expr >= @last_synced_at)) ds = ds.where(tscond) # We want to paginate from oldest to newest ds = ds.order(@timestamp_expr) yield(ds) end end |
#record(last_synced_at) ⇒ Object
346 347 348 349 350 |
# File 'lib/webhookdb/sync_target.rb', line 346 def record(last_synced_at) self.sync_target.update(last_synced_at:) rescue Sequel::NoExistingObject => e raise Webhookdb::SyncTarget::Deleted, e end |
#run ⇒ Object
323 |
# File 'lib/webhookdb/sync_target.rb', line 323 def run = raise NotImplementedError |