Class: JetstreamBridge::InboxRepository
- Inherits:
-
Object
- Object
- JetstreamBridge::InboxRepository
- Defined in:
- lib/jetstream_bridge/consumer/inbox/inbox_repository.rb
Overview
AR-facing operations for inbox rows (find/build/persist).
Instance Method Summary collapse
- #already_processed?(record) ⇒ Boolean
- #find_or_build(msg) ⇒ Object
-
#initialize(klass) ⇒ InboxRepository
constructor
A new instance of InboxRepository.
- #lock_record(record) ⇒ Object
- #persist_failure(record, error) ⇒ Object
- #persist_post(record) ⇒ Object
- #persist_pre(record, msg) ⇒ Object
Constructor Details
#initialize(klass) ⇒ InboxRepository
Returns a new instance of InboxRepository.
9 10 11 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 9 def initialize(klass) @klass = klass end |
Instance Method Details
#already_processed?(record) ⇒ Boolean
25 26 27 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 25 def already_processed?(record) record.respond_to?(:processed_at) && record.processed_at end |
#find_or_build(msg) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 13 def find_or_build(msg) record = if ModelUtils.has_columns?(@klass, :event_id) @klass.find_or_initialize_by(event_id: msg.event_id) elsif ModelUtils.has_columns?(@klass, :stream_seq) @klass.find_or_initialize_by(stream_seq: msg.seq) else @klass.new end lock_record(record) end |
#lock_record(record) ⇒ Object
80 81 82 83 84 85 86 87 88 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 80 def lock_record(record) return record unless record.respond_to?(:persisted?) && record.persisted? return record unless record.respond_to?(:lock!) record.lock! record rescue ActiveRecord::RecordNotFound @klass.new end |
#persist_failure(record, error) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 62 def persist_failure(record, error) return unless record ActiveRecord::Base.transaction do now = Time.now.utc attrs = { status: 'failed', last_error: "#{error.class}: #{error.}", updated_at: record.respond_to?(:updated_at) ? now : nil } ModelUtils.assign_known_attrs(record, attrs) record.save! end rescue StandardError => e Logging.warn("Failed to persist inbox failure: #{e.class}: #{e.}", tag: 'JetstreamBridge::Consumer') end |
#persist_post(record) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 49 def persist_post(record) ActiveRecord::Base.transaction do now = Time.now.utc attrs = { status: 'processed', processed_at: record.respond_to?(:processed_at) ? now : nil, updated_at: record.respond_to?(:updated_at) ? now : nil } ModelUtils.assign_known_attrs(record, attrs) record.save! end end |
#persist_pre(record, msg) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 29 def persist_pre(record, msg) ActiveRecord::Base.transaction do attrs = { event_id: msg.event_id, subject: msg.subject, payload: ModelUtils.json_dump(msg.body_for_store), headers: ModelUtils.json_dump(msg.headers), stream: msg.stream, stream_seq: msg.seq, deliveries: msg.deliveries, status: 'processing', last_error: nil, received_at: record.respond_to?(:received_at) ? (record.received_at || msg.now) : nil, updated_at: record.respond_to?(:updated_at) ? msg.now : nil } ModelUtils.assign_known_attrs(record, attrs) record.save! end end |