Class: Webhookdb::SyncTarget::HttpRoutine
- Defined in:
- lib/webhookdb/sync_target.rb
Instance Attribute Summary
Attributes inherited from Routine
#last_synced_at, #now, #replicator, #sync_target, #timestamp_expr
Instance Method Summary collapse
Methods inherited from Routine
#dataset_to_sync, #initialize, #record
Constructor Details
This class inherits a constructor from Webhookdb::SyncTarget::Routine
Instance Method Details
#_flush_http_chunk(chunk) ⇒ Object
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 |
# File 'lib/webhookdb/sync_target.rb', line 377 def _flush_http_chunk(chunk) sint = self.sync_target.service_integration body = { rows: chunk, integration_id: sint.opaque_id, integration_service: sint.service_name, table: sint.table_name, sync_timestamp: self.now, } cleanurl, authparams = Webhookdb::Http.extract_url_auth(self.sync_target.connection_url) Webhookdb::Http.post( cleanurl, body, timeout: sint.organization.sync_target_timeout, logger: self.sync_target.logger, basic_auth: authparams, ) latest_ts = chunk.last.fetch(self.replicator..name) # The client committed the sync page we sent. Record it in case of a future error, # so we don't re-send the same page. self.record(latest_ts) chunk.clear end |
#run ⇒ Object
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 |
# File 'lib/webhookdb/sync_target.rb', line 352 def run page_size = self.sync_target.page_size self.dataset_to_sync do |ds| chunk = [] ds.paged_each(rows_per_fetch: page_size) do |row| chunk << row self._flush_http_chunk(chunk) if chunk.size >= page_size end self._flush_http_chunk(chunk) unless chunk.empty? # We should save 'now' as the timestamp, rather than the last updated row. # This is important because other we'd keep trying to sync the last row synced. self.record(self.now) end rescue Webhookdb::Http::Error, Errno::ECONNRESET, Net::ReadTimeout, Net::OpenTimeout, OpenSSL::SSL::SSLError => e # This is handled well so no need to re-raise. # We already committed the last page that was successful, # so we can just stop syncing at this point to try again later. # Don't spam our logs with downstream errors idem_key = "sync_target_http_error-#{self.sync_target.id}-#{e.class.name}" Webhookdb::Idempotency.every(1.hour).in_memory.under_key(idem_key) do self.sync_target.logger.warn("sync_target_http_error", error: e) end end |