Class: JetstreamBridge::InboxRepository
- Inherits:
-
Object
- Object
- JetstreamBridge::InboxRepository
- Defined in:
- lib/jetstream_bridge/consumer/inbox/inbox_repository.rb
Overview
AR-facing operations for inbox rows (find/build/persist).
Instance Method Summary collapse
- #already_processed?(record) ⇒ Boolean
- #find_or_build(msg) ⇒ Object
-
#initialize(klass) ⇒ InboxRepository
constructor
A new instance of InboxRepository.
- #lock_record(record) ⇒ Object
- #persist_failure(record, error) ⇒ Object
- #persist_post(record) ⇒ Object
- #persist_pre(record, msg) ⇒ Object
Constructor Details
#initialize(klass) ⇒ InboxRepository
9 10 11 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 9 def initialize(klass) @klass = klass end |
Instance Method Details
#already_processed?(record) ⇒ Boolean
25 26 27 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 25 def already_processed?(record) record.respond_to?(:processed_at) && record.processed_at end |
#find_or_build(msg) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 13 def find_or_build(msg) record = if ModelUtils.has_columns?(@klass, :event_id) @klass.find_or_initialize_by(event_id: msg.event_id) elsif ModelUtils.has_columns?(@klass, :stream_seq) @klass.find_or_initialize_by(stream_seq: msg.seq) else @klass.new end lock_record(record) end |
#lock_record(record) ⇒ Object
90 91 92 93 94 95 96 97 98 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 90 def lock_record(record) return record unless record.respond_to?(:persisted?) && record.persisted? return record unless record.respond_to?(:lock!) record.lock! record rescue ActiveRecord::RecordNotFound @klass.new end |
#persist_failure(record, error) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 69 def persist_failure(record, error) return unless record ActiveRecord::Base.transaction do now = Time.now.utc error_msg = "#{error.class}: #{error.message}" attrs = { status: 'failed', error_message: error_msg, # Standard field name last_error: error_msg, # Legacy field (for backwards compatibility) failed_at: record.respond_to?(:failed_at) ? now : nil, updated_at: record.respond_to?(:updated_at) ? now : nil } ModelUtils.assign_known_attrs(record, attrs) record.save! end rescue StandardError => e Logging.warn("Failed to persist inbox failure: #{e.class}: #{e.message}", tag: 'JetstreamBridge::Consumer') end |
#persist_post(record) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 56 def persist_post(record) ActiveRecord::Base.transaction do now = Time.now.utc attrs = { status: 'processed', processed_at: record.respond_to?(:processed_at) ? now : nil, updated_at: record.respond_to?(:updated_at) ? now : nil } ModelUtils.assign_known_attrs(record, attrs) record.save! end end |
#persist_pre(record, msg) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 29 def persist_pre(record, msg) ActiveRecord::Base.transaction do attrs = { event_id: msg.event_id, event_type: msg.body['type'] || msg.body['event_type'], resource_type: msg.body['resource_type'], resource_id: msg.body['resource_id'], subject: msg.subject, payload: ModelUtils.json_dump(msg.body_for_store), headers: ModelUtils.json_dump(msg.headers), stream: msg.stream, stream_seq: msg.seq, deliveries: msg.deliveries, status: 'processing', error_message: nil, # Clear any previous error last_error: nil, # Legacy field (for backwards compatibility) processing_attempts: (record.respond_to?(:processing_attempts) ? (record.processing_attempts || 0) + 1 : nil), received_at: record.respond_to?(:received_at) ? (record.received_at || msg.now) : nil, updated_at: record.respond_to?(:updated_at) ? msg.now : nil } # Some schemas capture the producing app attrs[:source_app] = msg.body['producer'] || msg.headers['producer'] if record.respond_to?(:source_app=) ModelUtils.assign_known_attrs(record, attrs) record.save! end end |