Class: Webhookdb::SyncTarget::HttpRoutine

Inherits:
Routine
  • Object
show all
Defined in:
lib/webhookdb/sync_target.rb

Instance Attribute Summary

Attributes inherited from Routine

#last_synced_at, #now, #replicator, #sync_target, #timestamp_expr

Instance Method Summary collapse

Methods inherited from Routine

#dataset_to_sync, #initialize, #record

Constructor Details

This class inherits a constructor from Webhookdb::SyncTarget::Routine

Instance Method Details

#_flush_http_chunk(chunk) ⇒ Object



379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
# File 'lib/webhookdb/sync_target.rb', line 379

def _flush_http_chunk(chunk)
  sint = self.sync_target.service_integration
  body = {
    rows: chunk,
    integration_id: sint.opaque_id,
    integration_service: sint.service_name,
    table: sint.table_name,
    sync_timestamp: self.now,
  }
  cleanurl, authparams = Webhookdb::Http.extract_url_auth(self.sync_target.connection_url)
  Webhookdb::Http.post(
    cleanurl,
    body,
    timeout: sint.organization.sync_target_timeout,
    logger: self.sync_target.logger,
    basic_auth: authparams,
  )
  latest_ts = chunk.last.fetch(self.replicator.timestamp_column.name)
  # The client committed the sync page we sent. Record it in case of a future error,
  # so we don't re-send the same page.
  self.record(latest_ts)
  chunk.clear
end

#runObject



354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# File 'lib/webhookdb/sync_target.rb', line 354

def run
  page_size = self.sync_target.page_size
  self.dataset_to_sync do |ds|
    chunk = []
    ds.paged_each(rows_per_fetch: page_size) do |row|
      chunk << row
      self._flush_http_chunk(chunk) if chunk.size >= page_size
    end
    self._flush_http_chunk(chunk) unless chunk.empty?
    # We should save 'now' as the timestamp, rather than the last updated row.
    # This is important because other we'd keep trying to sync the last row synced.
    self.record(self.now)
  end
rescue Webhookdb::Http::Error, Errno::ECONNRESET, Net::ReadTimeout, Net::OpenTimeout, OpenSSL::SSL::SSLError => e
  # This is handled well so no need to re-raise.
  # We already committed the last page that was successful,
  # so we can just stop syncing at this point to try again later.

  # Don't spam our logs with downstream errors
  idem_key = "sync_target_http_error-#{self.sync_target.id}-#{e.class.name}"
  Webhookdb::Idempotency.every(1.hour).in_memory.under_key(idem_key) do
    self.sync_target.logger.warn("sync_target_http_error", error: e)
  end
end