Module: Webhookdb::Backfiller::Bulk

Instance Method Summary collapse

Instance Method Details

#conditional_upsert?Boolean

Should ‘_update_where_expr` be used or not? Default false, since most bulk upserting is backfill, which should only involve upserting new rows anyway.

Returns:

  • (Boolean)


74
# File 'lib/webhookdb/backfiller.rb', line 74

def conditional_upsert? = false

#dry_run?Boolean

Returns:

  • (Boolean)


76
# File 'lib/webhookdb/backfiller.rb', line 76

def dry_run? = false

#flush_pending_insertsObject



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/webhookdb/backfiller.rb', line 91

def flush_pending_inserts
  return if self.dry_run?
  return if self.pending_inserts.empty?
  rows_to_insert = self.pending_inserts.values
  update_where = self.conditional_upsert? ? self.upserting_replicator._update_where_expr : nil
  self.upserting_replicator.admin_dataset(timeout: :fast) do |ds|
    insert_ds = ds.insert_conflict(
      target: self.upserting_replicator._remote_key_column.name,
      update: self.upserting_replicator._upsert_update_expr(rows_to_insert.first),
      update_where:,
    )
    insert_ds.multi_insert(rows_to_insert)
  end
  self.pending_inserts.clear
end

#handle_item(body) ⇒ Array(String, Hash), Array(nil)

Add the item to pending upserts, and run the page upsert if needed. Return the key, and the item being upserted.

Returns:

  • (Array(String, Hash), Array(nil))


81
82
83
84
85
86
87
88
89
# File 'lib/webhookdb/backfiller.rb', line 81

def handle_item(body)
  self.prepare_body(body)
  inserting = self.upserting_replicator.upsert_webhook_body(body, upsert: false)
  return nil, nil if inserting.nil?
  k = inserting.fetch(self.remote_key_column_name)
  self.pending_inserts[k] = inserting
  self.flush_pending_inserts if self.pending_inserts.size >= self.upsert_page_size
  return k, inserting
end

#pending_insertsObject



70
71
72
73
# File 'lib/webhookdb/backfiller.rb', line 70

def pending_inserts = @pending_inserts ||= {}
# Should `_update_where_expr` be used or not?
# Default false, since most bulk upserting is backfill,
# which should only involve upserting new rows anyway.

#prepare_body(_body) ⇒ Object



66
# File 'lib/webhookdb/backfiller.rb', line 66

def prepare_body(_body) = raise NotImplementedError("add/remove keys from body before upsert")

#remote_key_column_nameObject



68
# File 'lib/webhookdb/backfiller.rb', line 68

def remote_key_column_name = @remote_key_column_name ||= self.upserting_replicator._remote_key_column.name

#upsert_page_sizeObject



65
# File 'lib/webhookdb/backfiller.rb', line 65

def upsert_page_size = raise NotImplementedError("how many items should be upserted at a time")

#upserting_replicatorObject



67
# File 'lib/webhookdb/backfiller.rb', line 67

def upserting_replicator = raise NotImplementedError("the replicator being upserted")