Class: JetstreamBridge::OutboxRepository
- Inherits:
-
Object
- Object
- JetstreamBridge::OutboxRepository
- Defined in:
- lib/jetstream_bridge/publisher/outbox_repository.rb
Overview
Encapsulates AR-backed outbox persistence operations.
Instance Method Summary collapse
- #already_sent?(record) ⇒ Boolean
- #find_or_build(event_id) ⇒ Object
-
#initialize(klass) ⇒ OutboxRepository
constructor
A new instance of OutboxRepository.
- #persist_exception(record, error) ⇒ Object
- #persist_failure(record, message) ⇒ Object
- #persist_pre(record, subject, envelope) ⇒ Object
- #persist_success(record) ⇒ Object
Constructor Details
#initialize(klass) ⇒ OutboxRepository
Returns a new instance of OutboxRepository.
9 10 11 |
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 9 def initialize(klass) @klass = klass end |
Instance Method Details
#already_sent?(record) ⇒ Boolean
33 34 35 36 |
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 33 def already_sent?(record) (record.respond_to?(:sent_at) && record.sent_at) || (record.respond_to?(:status) && record.status == 'sent') end |
#find_or_build(event_id) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 13 def find_or_build(event_id) record = ModelUtils.find_or_init_by_best( @klass, { event_id: event_id }, { dedup_key: event_id } # fallback if app uses a different unique column ) # Lock the row to prevent concurrent processing if record.persisted? && !record.new_record? && record.respond_to?(:lock!) begin record.lock! rescue ActiveRecord::RecordNotFound # Record was deleted between find and lock, create new record = @klass.new end end record end |
#persist_exception(record, error) ⇒ Object
81 82 83 84 85 86 87 88 |
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 81 def persist_exception(record, error) return unless record persist_failure(record, "#{error.class}: #{error.}") rescue StandardError => e Logging.warn("Failed to persist outbox failure: #{e.class}: #{e.}", tag: 'JetstreamBridge::Publisher') end |
#persist_failure(record, message) ⇒ Object
71 72 73 74 75 76 77 78 79 |
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 71 def persist_failure(record, ) ActiveRecord::Base.transaction do now = Time.now.utc attrs = { status: 'failed', last_error: } attrs[:updated_at] = now if record.respond_to?(:updated_at) ModelUtils.assign_known_attrs(record, attrs) record.save! end end |
#persist_pre(record, subject, envelope) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 38 def persist_pre(record, subject, envelope) ActiveRecord::Base.transaction do now = Time.now.utc event_id = envelope['event_id'].to_s attrs = { event_id: event_id, subject: subject, payload: ModelUtils.json_dump(envelope), headers: ModelUtils.json_dump({ 'nats-msg-id' => event_id }), status: 'publishing', last_error: nil } attrs[:attempts] = 1 + (record.attempts || 0) if record.respond_to?(:attempts) attrs[:enqueued_at] = (record.enqueued_at || now) if record.respond_to?(:enqueued_at) attrs[:updated_at] = now if record.respond_to?(:updated_at) ModelUtils.assign_known_attrs(record, attrs) record.save! end end |
#persist_success(record) ⇒ Object
60 61 62 63 64 65 66 67 68 69 |
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 60 def persist_success(record) ActiveRecord::Base.transaction do now = Time.now.utc attrs = { status: 'sent' } attrs[:sent_at] = now if record.respond_to?(:sent_at) attrs[:updated_at] = now if record.respond_to?(:updated_at) ModelUtils.assign_known_attrs(record, attrs) record.save! end end |