Class: JetstreamBridge::OutboxRepository
- Inherits:
-
Object
- Object
- JetstreamBridge::OutboxRepository
- Defined in:
- lib/jetstream_bridge/publisher/outbox_repository.rb
Overview
Encapsulates AR-backed outbox persistence operations.
Instance Method Summary collapse
- #already_sent?(record) ⇒ Boolean
- #find_or_build(event_id) ⇒ Object
-
#initialize(klass) ⇒ OutboxRepository
constructor
A new instance of OutboxRepository.
- #record_publish_attempt(record, subject, envelope) ⇒ Object
- #record_publish_exception(record, error) ⇒ Object
- #record_publish_failure(record, message) ⇒ Object
- #record_publish_success(record) ⇒ Object
Constructor Details
#initialize(klass) ⇒ OutboxRepository
9 10 11 |
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 9 def initialize(klass) @klass = klass end |
Instance Method Details
#already_sent?(record) ⇒ Boolean
33 34 35 36 |
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 33 def already_sent?(record) (record.respond_to?(:sent_at) && record.sent_at) || (record.respond_to?(:status) && record.status == 'sent') end |
#find_or_build(event_id) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 13 def find_or_build(event_id) record = ModelUtils.find_or_init_by_best( @klass, { event_id: event_id }, { dedup_key: event_id } # fallback if app uses a different unique column ) # Lock the row to prevent concurrent processing if record.persisted? && !record.new_record? && record.respond_to?(:lock!) begin record.lock! rescue ActiveRecord::RecordNotFound # Record was deleted between find and lock, create new record = @klass.new end end record end |
#record_publish_attempt(record, subject, envelope) ⇒ Object
38 39 40 41 42 43 44 |
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 38 def record_publish_attempt(record, subject, envelope) ActiveRecord::Base.transaction do attrs = build_publish_attrs(record, subject, envelope) ModelUtils.assign_known_attrs(record, attrs) record.save! end end |
#record_publish_exception(record, error) ⇒ Object
67 68 69 70 71 72 73 74 |
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 67 def record_publish_exception(record, error) return unless record record_publish_failure(record, "#{error.class}: #{error.message}") rescue StandardError => e Logging.warn("Failed to persist outbox failure: #{e.class}: #{e.message}", tag: 'JetstreamBridge::Publisher') end |
#record_publish_failure(record, message) ⇒ Object
57 58 59 60 61 62 63 64 65 |
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 57 def record_publish_failure(record, ) ActiveRecord::Base.transaction do now = Time.now.utc attrs = { status: 'failed', last_error: } attrs[:updated_at] = now if record.respond_to?(:updated_at) ModelUtils.assign_known_attrs(record, attrs) record.save! end end |
#record_publish_success(record) ⇒ Object
46 47 48 49 50 51 52 53 54 55 |
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 46 def record_publish_success(record) ActiveRecord::Base.transaction do now = Time.now.utc attrs = { status: 'sent' } attrs[:sent_at] = now if record.respond_to?(:sent_at) attrs[:updated_at] = now if record.respond_to?(:updated_at) ModelUtils.assign_known_attrs(record, attrs) record.save! end end |