Class: JetstreamBridge::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/jetstream_bridge/consumer/consumer.rb,
lib/jetstream_bridge/consumer/middleware.rb

Overview

Subscribes to destination subject and processes messages via a pull durable consumer.

The Consumer provides reliable message processing with features like:

  • Durable pull-based subscriptions with configurable batch sizes

  • Optional idempotent inbox pattern for exactly-once processing

  • Middleware support for cross-cutting concerns (logging, metrics, tracing)

  • Automatic reconnection and error recovery

  • Graceful shutdown with message draining

Examples:

Basic consumer

consumer = JetstreamBridge::Consumer.new do |event|
  puts "Received: #{event.type} - #{event.payload.to_h}"
  # Process event...
end
consumer.run!

Consumer with middleware

consumer = JetstreamBridge::Consumer.new(handler)
consumer.use(JetstreamBridge::Consumer::LoggingMiddleware.new)
consumer.use(JetstreamBridge::Consumer::MetricsMiddleware.new)
consumer.run!

Using convenience method

JetstreamBridge.subscribe do |event|
  ProcessEventJob.perform_later(event.to_h)
end.run!

Constant Summary collapse

DEFAULT_BATCH_SIZE =

Default number of messages to fetch in each batch

25
FETCH_TIMEOUT_SECS =

Timeout for fetching messages from NATS (seconds)

5
IDLE_SLEEP_SECS =

Initial sleep duration when no messages available (seconds)

0.05
MAX_IDLE_BACKOFF_SECS =

Maximum sleep duration during idle periods (seconds)

1.0
MiddlewareChain =

Alias middleware classes for easier access

ConsumerMiddleware::MiddlewareChain
LoggingMiddleware =
ConsumerMiddleware::LoggingMiddleware
ErrorHandlingMiddleware =
ConsumerMiddleware::ErrorHandlingMiddleware
MetricsMiddleware =
ConsumerMiddleware::MetricsMiddleware
TracingMiddleware =
ConsumerMiddleware::TracingMiddleware
TimeoutMiddleware =
ConsumerMiddleware::TimeoutMiddleware

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(handler = nil, durable_name: nil, batch_size: nil) {|event| ... } ⇒ Consumer

Initialize a new Consumer instance.

Examples:

With proc handler

handler = ->(event) { puts "Received: #{event.type}" }
consumer = JetstreamBridge::Consumer.new(handler)

With block

consumer = JetstreamBridge::Consumer.new do |event|
  UserEventHandler.process(event)
end

With custom configuration

consumer = JetstreamBridge::Consumer.new(
  handler,
  durable_name: "my-consumer",
  batch_size: 10
)

Yields:

  • (event)

    Optional block as handler. Receives Models::Event object.

Raises:

  • If neither handler nor block provided

  • If destination_app not configured

  • If unable to connect to NATS



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 97

def initialize(handler = nil, durable_name: nil, batch_size: nil, &block)
  @handler = handler || block
  raise ArgumentError, 'handler or block required' unless @handler

  @batch_size    = Integer(batch_size || DEFAULT_BATCH_SIZE)
  @durable       = durable_name || JetstreamBridge.config.durable_name
  @idle_backoff  = IDLE_SLEEP_SECS
  @reconnect_attempts = 0
  @running = true
  @shutdown_requested = false
  @start_time    = Time.now
  @iterations    = 0
  @last_health_check = Time.now
  # Use existing connection (should already be established)
  @jts = Connection.jetstream
  raise ConnectionError, 'JetStream connection not available. Call JetstreamBridge.startup! first.' unless @jts

  @middleware_chain = MiddlewareChain.new

  ensure_destination_app_configured!

  @sub_mgr = SubscriptionManager.new(@jts, @durable, JetstreamBridge.config)
  @processor  = MessageProcessor.new(@jts, @handler, middleware_chain: @middleware_chain)
  @inbox_proc = InboxProcessor.new(@processor) if JetstreamBridge.config.use_inbox

  ensure_subscription!
  setup_signal_handlers
end

Instance Attribute Details

#batch_sizeInteger (readonly)



63
64
65
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 63

def batch_size
  @batch_size
end

#durableString (readonly)



61
62
63
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 61

def durable
  @durable
end

#middleware_chainMiddlewareChain (readonly)



65
66
67
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 65

def middleware_chain
  @middleware_chain
end

Instance Method Details

#run!void

This method returns an undefined value.

Start the consumer and process messages in a blocking loop.

This method blocks the current thread and continuously fetches and processes messages until stop! is called or a signal is received (INT/TERM).

The consumer will:

  • Fetch messages in batches (configurable batch_size)

  • Process each message through the middleware chain

  • Handle errors and reconnection automatically

  • Implement exponential backoff during idle periods

  • Drain in-flight messages during graceful shutdown

Examples:

Basic usage

consumer = JetstreamBridge::Consumer.new { |event| process(event) }
consumer.run!  # Blocks here

In a Rake task

namespace :jetstream do
  task consume: :environment do
    consumer = JetstreamBridge.subscribe { |event| handle(event) }
    trap("TERM") { consumer.stop! }
    consumer.run!
  end
end

With error handling

consumer = JetstreamBridge::Consumer.new do |event|
  process(event)
rescue RecoverableError => e
  raise  # Let NATS retry
rescue UnrecoverableError => e
  logger.error(e)  # Log but don't raise (moves to DLQ if configured)
end
consumer.run!


194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 194

def run!
  Logging.info(
    "Consumer #{@durable} started (batch=#{@batch_size}, dest=#{JetstreamBridge.config.destination_subject})…",
    tag: 'JetstreamBridge::Consumer'
  )
  while @running
    processed = process_batch
    idle_sleep(processed)

    @iterations += 1

    # Periodic health checks every 10 minutes (600 seconds)
    perform_health_check_if_due
  end

  # Drain in-flight messages before exiting
  drain_inflight_messages if @shutdown_requested
  Logging.info("Consumer #{@durable} stopped gracefully", tag: 'JetstreamBridge::Consumer')
end

#stop!void

This method returns an undefined value.

Stop the consumer gracefully and drain in-flight messages.

This method signals the consumer to stop processing new messages and drain any messages that are currently being processed. It’s safe to call from signal handlers or other threads.

The consumer will:

  • Stop fetching new messages

  • Complete processing of in-flight messages

  • Drain pending messages (up to 5 batches)

  • Close the subscription cleanly

Examples:

In signal handler

consumer = JetstreamBridge::Consumer.new { |event| process(event) }
trap("TERM") { consumer.stop! }
consumer.run!

Manual control

consumer = JetstreamBridge::Consumer.new { |event| process(event) }
Thread.new { consumer.run! }
sleep 10
consumer.stop!  # Stop after 10 seconds


239
240
241
242
243
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 239

def stop!
  @shutdown_requested = true
  @running = false
  Logging.info("Consumer #{@durable} shutdown requested", tag: 'JetstreamBridge::Consumer')
end

#use(middleware) ⇒ self

Add middleware to the processing chain.

Middleware is executed in the order it’s added. Each middleware must respond to #call(event, &block) and yield to continue the chain.

Examples:

Adding multiple middleware

consumer = JetstreamBridge.subscribe { |event| process(event) }
consumer.use(JetstreamBridge::Consumer::LoggingMiddleware.new)
consumer.use(JetstreamBridge::Consumer::MetricsMiddleware.new)
consumer.use(JetstreamBridge::Consumer::TimeoutMiddleware.new(timeout: 30))
consumer.run!

Custom middleware

class MyMiddleware
  def call(event)
    puts "Before: #{event.type}"
    yield
    puts "After: #{event.type}"
  end
end
consumer.use(MyMiddleware.new)


152
153
154
155
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 152

def use(middleware)
  @middleware_chain.use(middleware)
  self
end