Class: Webhookdb::SyncTarget::Routine

Inherits:
Object
  • Object
show all
Defined in:
lib/webhookdb/sync_target.rb

Direct Known Subclasses

DatabaseRoutine, HttpRoutine

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(now, sync_target) ⇒ Routine

Returns a new instance of Routine.



315
316
317
318
319
320
321
# File 'lib/webhookdb/sync_target.rb', line 315

def initialize(now, sync_target)
  @now = now
  @sync_target = sync_target
  @last_synced_at = sync_target.last_synced_at
  @replicator = sync_target.service_integration.replicator
  @timestamp_expr = Sequel[@replicator.timestamp_column.name]
end

Instance Attribute Details

#last_synced_atTime

Returns:

  • (Time)


312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/webhookdb/sync_target.rb', line 312

class Routine
  attr_reader :now, :sync_target, :replicator, :timestamp_expr

  def initialize(now, sync_target)
    @now = now
    @sync_target = sync_target
    @last_synced_at = sync_target.last_synced_at
    @replicator = sync_target.service_integration.replicator
    @timestamp_expr = Sequel[@replicator.timestamp_column.name]
  end

  def run = raise NotImplementedError

  # Get the dataset of rows that need to be synced.
  # Note that there are a couple race conditions here.
  # First, those in https://github.com/webhookdb/webhookdb/issues/571.
  # There is also the condition that we could send the same row
  # multiple times when the row timestamp is set to last_synced_at but
  # it wasn't in the last sync; however that is likely not a big problem
  # since clients need to handle updates in any case.
  def dataset_to_sync
    # Use admin dataset, since the client could be using all their readonly conns.
    @replicator.admin_dataset do |ds|
      # Find rows updated before we started
      tscond = (@timestamp_expr <= @now)
      # Find rows updated after the last sync was run
      @last_synced_at && (tscond &= (@timestamp_expr >= @last_synced_at))
      ds = ds.where(tscond)
      # We want to paginate from oldest to newest
      ds = ds.order(@timestamp_expr)
      yield(ds)
    end
  end

  def record(last_synced_at)
    self.sync_target.update(last_synced_at:)
  rescue Sequel::NoExistingObject => e
    raise Webhookdb::SyncTarget::Deleted, e
  end
end

#nowObject (readonly)

Returns the value of attribute now.



313
314
315
# File 'lib/webhookdb/sync_target.rb', line 313

def now
  @now
end

#replicatorObject (readonly)

Returns the value of attribute replicator.



313
314
315
# File 'lib/webhookdb/sync_target.rb', line 313

def replicator
  @replicator
end

#sync_targetObject (readonly)

Returns the value of attribute sync_target.



313
314
315
# File 'lib/webhookdb/sync_target.rb', line 313

def sync_target
  @sync_target
end

#timestamp_exprObject (readonly)

Returns the value of attribute timestamp_expr.



313
314
315
# File 'lib/webhookdb/sync_target.rb', line 313

def timestamp_expr
  @timestamp_expr
end

Instance Method Details

#dataset_to_syncObject

Get the dataset of rows that need to be synced. Note that there are a couple race conditions here. First, those in github.com/webhookdb/webhookdb/issues/571. There is also the condition that we could send the same row multiple times when the row timestamp is set to last_synced_at but it wasn’t in the last sync; however that is likely not a big problem since clients need to handle updates in any case.



332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/webhookdb/sync_target.rb', line 332

def dataset_to_sync
  # Use admin dataset, since the client could be using all their readonly conns.
  @replicator.admin_dataset do |ds|
    # Find rows updated before we started
    tscond = (@timestamp_expr <= @now)
    # Find rows updated after the last sync was run
    @last_synced_at && (tscond &= (@timestamp_expr >= @last_synced_at))
    ds = ds.where(tscond)
    # We want to paginate from oldest to newest
    ds = ds.order(@timestamp_expr)
    yield(ds)
  end
end

#record(last_synced_at) ⇒ Object



346
347
348
349
350
# File 'lib/webhookdb/sync_target.rb', line 346

def record(last_synced_at)
  self.sync_target.update(last_synced_at:)
rescue Sequel::NoExistingObject => e
  raise Webhookdb::SyncTarget::Deleted, e
end

#runObject

Raises:

  • (NotImplementedError)


323
# File 'lib/webhookdb/sync_target.rb', line 323

def run = raise NotImplementedError