Class: JetstreamBridge::MessageProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/jetstream_bridge/consumer/message_processor.rb

Overview

Orchestrates parse → handler → ack/nak → DLQ

Defined Under Namespace

Classes: ActionResult

Constant Summary collapse

UNRECOVERABLE_ERRORS =
[ArgumentError, TypeError].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(jts, handler, dlq: nil, backoff: nil, middleware_chain: nil) ⇒ MessageProcessor



57
58
59
60
61
62
63
# File 'lib/jetstream_bridge/consumer/message_processor.rb', line 57

def initialize(jts, handler, dlq: nil, backoff: nil, middleware_chain: nil)
  @jts              = jts
  @handler          = handler
  @dlq              = dlq || DlqPublisher.new(jts)
  @backoff          = backoff || BackoffStrategy.new
  @middleware_chain = middleware_chain || ConsumerMiddleware::MiddlewareChain.new
end

Instance Attribute Details

#middleware_chainObject (readonly)

Returns the value of attribute middleware_chain.



55
56
57
# File 'lib/jetstream_bridge/consumer/message_processor.rb', line 55

def middleware_chain
  @middleware_chain
end

Instance Method Details

#handle_message(msg, auto_ack: true) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/jetstream_bridge/consumer/message_processor.rb', line 65

def handle_message(msg, auto_ack: true)
  ctx = MessageContext.build(msg)
  event, early_action = parse_message(msg, ctx)
  return apply_action(msg, early_action) if early_action && auto_ack
  return early_action if early_action

  result = process_event(msg, event, ctx)
  apply_action(msg, result) if auto_ack
  result
rescue StandardError => e
  backtrace = e.backtrace&.first(5)&.join("\n  ")
  Logging.error(
    "Processor crashed event_id=#{ctx&.event_id} subject=#{ctx&.subject} seq=#{ctx&.seq} " \
    "deliveries=#{ctx&.deliveries} err=#{e.class}: #{e.message}\n  #{backtrace}",
    tag: 'JetstreamBridge::Consumer'
  )
  action = ActionResult.new(action: :nak, ctx: ctx, error: e)
  apply_action(msg, action) if auto_ack
  action
end