Class: Webhookdb::LoggedWebhook::Resilient
- Inherits:
-
Object
- Object
- Webhookdb::LoggedWebhook::Resilient
- Defined in:
- lib/webhookdb/logged_webhook/resilient.rb
Instance Method Summary collapse
- #_dburl_log_kwargs(dburl) ⇒ Object
- #database_urls ⇒ Object
- #insert(kwargs) ⇒ Object
- #logger ⇒ Object
-
#replay ⇒ Object
-
For each (reachable) database: - Select 1 row, with a lock - Replay the webhook - On success, delete the row - On failure, process the next row.
-
- #write_to(dburl, service_integration_opaque_id, str_payload) ⇒ Object
Instance Method Details
#_dburl_log_kwargs(dburl) ⇒ Object
44 45 46 47 |
# File 'lib/webhookdb/logged_webhook/resilient.rb', line 44 def _dburl_log_kwargs(dburl) u = URI(dburl) return {fallback_database_host: u.host, fallback_database_name: u.path} end |
#database_urls ⇒ Object
6 |
# File 'lib/webhookdb/logged_webhook/resilient.rb', line 6 def database_urls = Webhookdb::LoggedWebhook.available_resilient_database_urls |
#insert(kwargs) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/webhookdb/logged_webhook/resilient.rb', line 8 def insert(kwargs) return Webhookdb::LoggedWebhook.dataset.insert(kwargs) rescue Sequel::DatabaseError => e service_integration_opaque_id = kwargs.fetch(:service_integration_opaque_id) str_payload = JSON.dump(kwargs) self.database_urls.each do |url| next unless self.write_to(url, service_integration_opaque_id, str_payload) self.logger.warn "resilient_insert_handled", error: e, **self._dburl_log_kwargs(url) return true end self.logger.error "resilient_insert_unhandled", error: e, logged_webhook_kwargs: kwargs raise end |
#logger ⇒ Object
4 |
# File 'lib/webhookdb/logged_webhook/resilient.rb', line 4 def logger = Webhookdb::LoggedWebhook.logger |
#replay ⇒ Object
-
For each (reachable) database:
-
Select 1 row, with a lock
-
Replay the webhook
-
On success, delete the row
-
On failure, process the next row
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/webhookdb/logged_webhook/resilient.rb', line 54 def replay tblname = Webhookdb::LoggedWebhook.resilient_table_name begin Webhookdb::LoggedWebhook.db.execute("SELECT 1=1") rescue Sequel::DatabaseError self.logger.debug("resilient_replay_primary_db_not_ready") return nil end replayed = 0 self.database_urls.each do |url| Sequel.connect(url) do |rdb| has_more = true # Keep track of the last pk we've replayed, so we can grab the next available one. # Otherwise we can end up spinning on the same one, especially with other threads. seen_pk = 0 while has_more # Each row must be processed in a transaction rdb.transaction do row = rdb.from(tblname).where { pk > seen_pk }.for_update.skip_locked.order(:pk).limit(1).first if row.nil? has_more = false break # The break only works for the transaction end pk = row.fetch(:pk) seen_pk = pk payload = JSON.parse(row.fetch(:json_payload)) # We replay the webhook from a separate job # so it can be done idempotently/exclusively. lwh = Webhookdb::LoggedWebhook.create(payload) lwh.replay_async replayed += 1 rdb.from(tblname).where(pk:).delete end end end return replayed rescue Sequel::DatabaseError self.logger.debug("resilient_replay_fallback_unavailable", **self._dburl_log_kwargs(url)) return nil end end |
#write_to(dburl, service_integration_opaque_id, str_payload) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/webhookdb/logged_webhook/resilient.rb', line 22 def write_to(dburl, service_integration_opaque_id, str_payload) tblname = Webhookdb::LoggedWebhook.resilient_table_name Sequel.connect(dburl, single_threaded: true) do |db| begin db.create_table?(tblname.to_sym) do primary_key :pk text :service_integration_opaque_id text :json_payload end rescue Sequel::UniqueConstraintViolation # We cannot avoid this race condition. If needed, we can optimize this, but it's a pain # so don't worry about it for now. nil end db.from(tblname).insert(service_integration_opaque_id:, json_payload: str_payload) end return true rescue StandardError => e self.logger.debug "resilient_insert_failure", error: e, **self._dburl_log_kwargs(dburl) return false end |