Module: Webhookdb::Backfiller::Bulk
- Included in:
- Replicator::AtomSingleFeedV1::Backfiller, Replicator::AwsPricingV1::ServiceBackfiller, Replicator::EmailOctopusContactV1::ContactBackfiller, Replicator::EmailOctopusEventV1::EventBackfiller, Replicator::IcalendarCalendarV1::Upserter, Replicator::SponsyV1Mixin::PublicationChildBackfiller
- Defined in:
- lib/webhookdb/backfiller.rb
Instance Method Summary collapse
-
#conditional_upsert? ⇒ Boolean
Should ‘_update_where_expr` be used or not? Default false, since most bulk upserting is backfill, which should only involve upserting new rows anyway.
- #dry_run? ⇒ Boolean
- #flush_pending_inserts ⇒ Object
-
#handle_item(body) ⇒ Array(String, Hash), Array(nil)
Add the item to pending upserts, and run the page upsert if needed.
- #pending_inserts ⇒ Object
- #prepare_body(_body) ⇒ Object
- #remote_key_column_name ⇒ Object
- #upsert_page_size ⇒ Object
- #upserting_replicator ⇒ Object
Instance Method Details
#conditional_upsert? ⇒ Boolean
Should ‘_update_where_expr` be used or not? Default false, since most bulk upserting is backfill, which should only involve upserting new rows anyway.
74 |
# File 'lib/webhookdb/backfiller.rb', line 74 def conditional_upsert? = false |
#dry_run? ⇒ Boolean
76 |
# File 'lib/webhookdb/backfiller.rb', line 76 def dry_run? = false |
#flush_pending_inserts ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/webhookdb/backfiller.rb', line 91 def flush_pending_inserts return if self.dry_run? return if self.pending_inserts.empty? rows_to_insert = self.pending_inserts.values update_where = self.conditional_upsert? ? self.upserting_replicator._update_where_expr : nil self.upserting_replicator.admin_dataset(timeout: :fast) do |ds| insert_ds = ds.insert_conflict( target: self.upserting_replicator._remote_key_column.name, update: self.upserting_replicator._upsert_update_expr(rows_to_insert.first), update_where:, ) insert_ds.multi_insert(rows_to_insert) end self.pending_inserts.clear end |
#handle_item(body) ⇒ Array(String, Hash), Array(nil)
Add the item to pending upserts, and run the page upsert if needed. Return the key, and the item being upserted.
81 82 83 84 85 86 87 88 89 |
# File 'lib/webhookdb/backfiller.rb', line 81 def handle_item(body) self.prepare_body(body) inserting = self.upserting_replicator.upsert_webhook_body(body, upsert: false) return nil, nil if inserting.nil? k = inserting.fetch(self.remote_key_column_name) self.pending_inserts[k] = inserting self.flush_pending_inserts if self.pending_inserts.size >= self.upsert_page_size return k, inserting end |
#pending_inserts ⇒ Object
70 71 72 73 |
# File 'lib/webhookdb/backfiller.rb', line 70 def pending_inserts = @pending_inserts ||= {} # Should `_update_where_expr` be used or not? # Default false, since most bulk upserting is backfill, # which should only involve upserting new rows anyway. |
#prepare_body(_body) ⇒ Object
66 |
# File 'lib/webhookdb/backfiller.rb', line 66 def prepare_body(_body) = raise NotImplementedError("add/remove keys from body before upsert") |
#remote_key_column_name ⇒ Object
68 |
# File 'lib/webhookdb/backfiller.rb', line 68 def remote_key_column_name = @remote_key_column_name ||= self.upserting_replicator._remote_key_column.name |
#upsert_page_size ⇒ Object
65 |
# File 'lib/webhookdb/backfiller.rb', line 65 def upsert_page_size = raise NotImplementedError("how many items should be upserted at a time") |
#upserting_replicator ⇒ Object
67 |
# File 'lib/webhookdb/backfiller.rb', line 67 def upserting_replicator = raise NotImplementedError("the replicator being upserted") |