Class: JetstreamBridge::MessageProcessor
- Inherits:
-
Object
- Object
- JetstreamBridge::MessageProcessor
- Defined in:
- lib/jetstream_bridge/consumer/message_processor.rb
Overview
Orchestrates parse → handler → ack/nak → DLQ
Defined Under Namespace
Classes: ActionResult
Constant Summary collapse
- UNRECOVERABLE_ERRORS =
[ArgumentError, TypeError].freeze
Instance Attribute Summary collapse
-
#middleware_chain ⇒ Object
readonly
Returns the value of attribute middleware_chain.
Instance Method Summary collapse
- #handle_message(msg, auto_ack: true) ⇒ Object
-
#initialize(jts, handler, dlq: nil, backoff: nil, middleware_chain: nil) ⇒ MessageProcessor
constructor
A new instance of MessageProcessor.
Constructor Details
#initialize(jts, handler, dlq: nil, backoff: nil, middleware_chain: nil) ⇒ MessageProcessor
57 58 59 60 61 62 63 |
# File 'lib/jetstream_bridge/consumer/message_processor.rb', line 57 def initialize(jts, handler, dlq: nil, backoff: nil, middleware_chain: nil) @jts = jts @handler = handler @dlq = dlq || DlqPublisher.new(jts) @backoff = backoff || BackoffStrategy.new @middleware_chain = middleware_chain || ConsumerMiddleware::MiddlewareChain.new end |
Instance Attribute Details
#middleware_chain ⇒ Object (readonly)
Returns the value of attribute middleware_chain.
55 56 57 |
# File 'lib/jetstream_bridge/consumer/message_processor.rb', line 55 def middleware_chain @middleware_chain end |
Instance Method Details
#handle_message(msg, auto_ack: true) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/jetstream_bridge/consumer/message_processor.rb', line 65 def (msg, auto_ack: true) ctx = MessageContext.build(msg) event, early_action = (msg, ctx) return apply_action(msg, early_action) if early_action && auto_ack return early_action if early_action result = process_event(msg, event, ctx) apply_action(msg, result) if auto_ack result rescue StandardError => e backtrace = e.backtrace&.first(5)&.join("\n ") Logging.error( "Processor crashed event_id=#{ctx&.event_id} subject=#{ctx&.subject} seq=#{ctx&.seq} " \ "deliveries=#{ctx&.deliveries} err=#{e.class}: #{e.message}\n #{backtrace}", tag: 'JetstreamBridge::Consumer' ) action = ActionResult.new(action: :nak, ctx: ctx, error: e) apply_action(msg, action) if auto_ack action end |