Class: JetstreamBridge::MessageProcessor
- Inherits:
-
Object
- Object
- JetstreamBridge::MessageProcessor
- Defined in:
- lib/jetstream_bridge/consumer/message_processor.rb
Overview
Orchestrates the parse -> handler -> ack/nak -> DLQ pipeline for each message.
Responsible for deserializing incoming NATS messages, running them through the middleware chain and handler, and deciding whether to ACK, NAK, or route to the dead letter queue.
Defined Under Namespace
Classes: ActionResult
Constant Summary collapse
- UNRECOVERABLE_ERRORS =
Error types that skip retry and go straight to DLQ
[ArgumentError, TypeError].freeze
Instance Attribute Summary collapse
-
#middleware_chain ⇒ ConsumerMiddleware::MiddlewareChain
readonly
Middleware chain.
Instance Method Summary collapse
-
#handle_message(msg, auto_ack: true) ⇒ ActionResult
Process a single NATS message through the full pipeline.
-
#initialize(jts, handler, dlq: nil, backoff: nil, middleware_chain: nil) ⇒ MessageProcessor
constructor
A new instance of MessageProcessor.
Constructor Details
#initialize(jts, handler, dlq: nil, backoff: nil, middleware_chain: nil) ⇒ MessageProcessor
Returns a new instance of MessageProcessor.
104 105 106 107 108 109 110 |
# File 'lib/jetstream_bridge/consumer/message_processor.rb', line 104 def initialize(jts, handler, dlq: nil, backoff: nil, middleware_chain: nil) @jts = jts @handler = handler @dlq = dlq || DlqPublisher.new(jts) @backoff = backoff || BackoffStrategy.new @middleware_chain = middleware_chain || ConsumerMiddleware::MiddlewareChain.new end |
Instance Attribute Details
#middleware_chain ⇒ ConsumerMiddleware::MiddlewareChain (readonly)
Returns Middleware chain.
97 98 99 |
# File 'lib/jetstream_bridge/consumer/message_processor.rb', line 97 def middleware_chain @middleware_chain end |
Instance Method Details
#handle_message(msg, auto_ack: true) ⇒ ActionResult
Process a single NATS message through the full pipeline.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/jetstream_bridge/consumer/message_processor.rb', line 117 def (msg, auto_ack: true) ctx = MessageContext.build(msg) event, early_action = (msg, ctx) return apply_action(msg, early_action) if early_action && auto_ack return early_action if early_action result = process_event(msg, event, ctx) apply_action(msg, result) if auto_ack result rescue StandardError => e backtrace = e.backtrace&.first(5)&.join("\n ") Logging.error( "Processor crashed event_id=#{ctx&.event_id} subject=#{ctx&.subject} seq=#{ctx&.seq} " \ "deliveries=#{ctx&.deliveries} err=#{e.class}: #{e.message}\n #{backtrace}", tag: 'JetstreamBridge::Consumer' ) action = ActionResult.new(action: :nak, ctx: ctx, error: e) apply_action(msg, action) if auto_ack action end |