Class: JetstreamBridge::InboxProcessor
- Inherits:
-
Object
- Object
- JetstreamBridge::InboxProcessor
- Defined in:
- lib/jetstream_bridge/consumer/inbox/inbox_processor.rb
Overview
Orchestrates AR-backed inbox processing.
Instance Method Summary collapse
-
#initialize(message_processor) ⇒ InboxProcessor
constructor
A new instance of InboxProcessor.
-
#process(msg) ⇒ true, false
Processed?.
Constructor Details
#initialize(message_processor) ⇒ InboxProcessor
Returns a new instance of InboxProcessor.
11 12 13 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_processor.rb', line 11 def initialize() @processor = end |
Instance Method Details
#process(msg) ⇒ true, false
Returns processed?.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/jetstream_bridge/consumer/inbox/inbox_processor.rb', line 16 def process(msg) klass = ModelUtils.constantize(JetstreamBridge.config.inbox_model) return process_direct?(msg, klass) unless ModelUtils.ar_class?(klass) msg = InboxMessage.from_nats(msg) repo = InboxRepository.new(klass) record = repo.find_or_build(msg) if repo.already_processed?(record) msg.ack return true end repo.persist_pre(record, msg) action = @processor.(msg, auto_ack: false) case action&.action when :ack repo.persist_post(record) @processor.send(:apply_action, msg, action) true when :nak repo.persist_failure(record, action.error || StandardError.new('Inbox processing failed')) @processor.send(:apply_action, msg, action) false else repo.persist_failure(record, StandardError.new('Inbox processing returned no action')) false end rescue StandardError => e repo.persist_failure(record, e) if repo && record Logging.error("Inbox processing failed: #{e.class}: #{e.}", tag: 'JetstreamBridge::Consumer') # Ensure the message is retried if possible @processor.send(:safe_nak, msg, nil, e, delay: nil) if msg.respond_to?(:nak) false end |