Class: JetstreamBridge::MessageProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/jetstream_bridge/consumer/message_processor.rb

Overview

Orchestrates the parse -> handler -> ack/nak -> DLQ pipeline for each message.

Responsible for deserializing incoming NATS messages, running them through the middleware chain and handler, and deciding whether to ACK, NAK, or route to the dead letter queue.

Defined Under Namespace

Classes: ActionResult

Constant Summary collapse

UNRECOVERABLE_ERRORS =

Error types that skip retry and go straight to DLQ

[ArgumentError, TypeError].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(jts, handler, dlq: nil, backoff: nil, middleware_chain: nil) ⇒ MessageProcessor

Returns a new instance of MessageProcessor.

Parameters:

  • jts (NATS::JetStream)

    JetStream context

  • handler (#call)

    User-provided event handler

  • dlq (DlqPublisher, nil) (defaults to: nil)

    DLQ publisher (auto-created if nil)

  • backoff (BackoffStrategy, nil) (defaults to: nil)

    Backoff strategy (auto-created if nil)

  • middleware_chain (ConsumerMiddleware::MiddlewareChain, nil) (defaults to: nil)

    Middleware chain



104
105
106
107
108
109
110
# File 'lib/jetstream_bridge/consumer/message_processor.rb', line 104

def initialize(jts, handler, dlq: nil, backoff: nil, middleware_chain: nil)
  @jts              = jts
  @handler          = handler
  @dlq              = dlq || DlqPublisher.new(jts)
  @backoff          = backoff || BackoffStrategy.new
  @middleware_chain = middleware_chain || ConsumerMiddleware::MiddlewareChain.new
end

Instance Attribute Details

#middleware_chainConsumerMiddleware::MiddlewareChain (readonly)

Returns Middleware chain.

Returns:



97
98
99
# File 'lib/jetstream_bridge/consumer/message_processor.rb', line 97

def middleware_chain
  @middleware_chain
end

Instance Method Details

#handle_message(msg, auto_ack: true) ⇒ ActionResult

Process a single NATS message through the full pipeline.

Parameters:

  • msg (NATS::Msg)

    Raw NATS message

  • auto_ack (Boolean) (defaults to: true)

    Whether to automatically ACK/NAK the message

Returns:



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/jetstream_bridge/consumer/message_processor.rb', line 117

def handle_message(msg, auto_ack: true)
  ctx = MessageContext.build(msg)
  event, early_action = parse_message(msg, ctx)
  return apply_action(msg, early_action) if early_action && auto_ack
  return early_action if early_action

  result = process_event(msg, event, ctx)
  apply_action(msg, result) if auto_ack
  result
rescue StandardError => e
  backtrace = e.backtrace&.first(5)&.join("\n  ")
  Logging.error(
    "Processor crashed event_id=#{ctx&.event_id} subject=#{ctx&.subject} seq=#{ctx&.seq} " \
    "deliveries=#{ctx&.deliveries} err=#{e.class}: #{e.message}\n  #{backtrace}",
    tag: 'JetstreamBridge::Consumer'
  )
  action = ActionResult.new(action: :nak, ctx: ctx, error: e)
  apply_action(msg, action) if auto_ack
  action
end