Class: Webhookdb::SyncTarget::Routine

Inherits:
Object
  • Object
show all
Defined in:
lib/webhookdb/sync_target.rb

Direct Known Subclasses

DatabaseRoutine, HttpRoutine

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(now, sync_target) ⇒ Routine

Returns a new instance of Routine.



313
314
315
316
317
318
319
# File 'lib/webhookdb/sync_target.rb', line 313

def initialize(now, sync_target)
  @now = now
  @sync_target = sync_target
  @last_synced_at = sync_target.last_synced_at
  @replicator = sync_target.service_integration.replicator
  @timestamp_expr = Sequel[@replicator.timestamp_column.name]
end

Instance Attribute Details

#last_synced_atTime

Returns:

  • (Time)


310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
# File 'lib/webhookdb/sync_target.rb', line 310

class Routine
  attr_reader :now, :sync_target, :replicator, :timestamp_expr

  def initialize(now, sync_target)
    @now = now
    @sync_target = sync_target
    @last_synced_at = sync_target.last_synced_at
    @replicator = sync_target.service_integration.replicator
    @timestamp_expr = Sequel[@replicator.timestamp_column.name]
  end

  def run = raise NotImplementedError

  # Get the dataset of rows that need to be synced.
  # Note that there are a couple race conditions here.
  # First, those in https://github.com/webhookdb/webhookdb/issues/571.
  # There is also the condition that we could send the same row
  # multiple times when the row timestamp is set to last_synced_at but
  # it wasn't in the last sync; however that is likely not a big problem
  # since clients need to handle updates in any case.
  def dataset_to_sync
    # Use admin dataset, since the client could be using all their readonly conns.
    @replicator.admin_dataset do |ds|
      # Find rows updated before we started
      tscond = (@timestamp_expr <= @now)
      # Find rows updated after the last sync was run
      @last_synced_at && (tscond &= (@timestamp_expr >= @last_synced_at))
      ds = ds.where(tscond)
      # We want to paginate from oldest to newest
      ds = ds.order(@timestamp_expr)
      yield(ds)
    end
  end

  def record(last_synced_at)
    self.sync_target.update(last_synced_at:)
  rescue Sequel::NoExistingObject => e
    raise Webhookdb::SyncTarget::Deleted, e
  end
end

#nowObject (readonly)

Returns the value of attribute now.



311
312
313
# File 'lib/webhookdb/sync_target.rb', line 311

def now
  @now
end

#replicatorObject (readonly)

Returns the value of attribute replicator.



311
312
313
# File 'lib/webhookdb/sync_target.rb', line 311

def replicator
  @replicator
end

#sync_targetObject (readonly)

Returns the value of attribute sync_target.



311
312
313
# File 'lib/webhookdb/sync_target.rb', line 311

def sync_target
  @sync_target
end

#timestamp_exprObject (readonly)

Returns the value of attribute timestamp_expr.



311
312
313
# File 'lib/webhookdb/sync_target.rb', line 311

def timestamp_expr
  @timestamp_expr
end

Instance Method Details

#dataset_to_syncObject

Get the dataset of rows that need to be synced. Note that there are a couple race conditions here. First, those in github.com/webhookdb/webhookdb/issues/571. There is also the condition that we could send the same row multiple times when the row timestamp is set to last_synced_at but it wasn’t in the last sync; however that is likely not a big problem since clients need to handle updates in any case.



330
331
332
333
334
335
336
337
338
339
340
341
342
# File 'lib/webhookdb/sync_target.rb', line 330

def dataset_to_sync
  # Use admin dataset, since the client could be using all their readonly conns.
  @replicator.admin_dataset do |ds|
    # Find rows updated before we started
    tscond = (@timestamp_expr <= @now)
    # Find rows updated after the last sync was run
    @last_synced_at && (tscond &= (@timestamp_expr >= @last_synced_at))
    ds = ds.where(tscond)
    # We want to paginate from oldest to newest
    ds = ds.order(@timestamp_expr)
    yield(ds)
  end
end

#record(last_synced_at) ⇒ Object



344
345
346
347
348
# File 'lib/webhookdb/sync_target.rb', line 344

def record(last_synced_at)
  self.sync_target.update(last_synced_at:)
rescue Sequel::NoExistingObject => e
  raise Webhookdb::SyncTarget::Deleted, e
end

#runObject

Raises:

  • (NotImplementedError)


321
# File 'lib/webhookdb/sync_target.rb', line 321

def run = raise NotImplementedError