Class: JetstreamBridge::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/jetstream_bridge/consumer/consumer.rb,
lib/jetstream_bridge/consumer/middleware.rb,
lib/jetstream_bridge/consumer/consumer_state.rb

Overview

Subscribes to destination subject and processes messages via a pull durable consumer.

The Consumer provides reliable message processing with features like:

  • Durable pull-based subscriptions with configurable batch sizes

  • Optional idempotent inbox pattern for exactly-once processing

  • Middleware support for cross-cutting concerns (logging, metrics, tracing)

  • Automatic reconnection and error recovery

  • Graceful shutdown with message draining

Examples:

Basic consumer

consumer = JetstreamBridge::Consumer.new do |event|
  puts "Received: #{event.type} - #{event.payload.to_h}"
  # Process event...
end
consumer.run!

Consumer with middleware

consumer = JetstreamBridge::Consumer.new(handler)
consumer.use(JetstreamBridge::Consumer::LoggingMiddleware.new)
consumer.use(JetstreamBridge::Consumer::MetricsMiddleware.new)
consumer.run!

Using convenience method

JetstreamBridge.subscribe do |event|
  ProcessEventJob.perform_later(event.to_h)
end.run!

Defined Under Namespace

Classes: ConnectionState, LifecycleState, ProcessingState

Constant Summary collapse

DEFAULT_BATCH_SIZE =

Default number of messages to fetch in each batch

25
FETCH_TIMEOUT_SECS =

Timeout for fetching messages from NATS (seconds)

5
IDLE_SLEEP_SECS =

Initial sleep duration when no messages available (seconds)

0.05
MAX_IDLE_BACKOFF_SECS =

Maximum sleep duration during idle periods (seconds)

1.0
MiddlewareChain =

Alias middleware classes for easier access

ConsumerMiddleware::MiddlewareChain
LoggingMiddleware =
ConsumerMiddleware::LoggingMiddleware
ErrorHandlingMiddleware =
ConsumerMiddleware::ErrorHandlingMiddleware
MetricsMiddleware =
ConsumerMiddleware::MetricsMiddleware
TracingMiddleware =
ConsumerMiddleware::TracingMiddleware
TimeoutMiddleware =
ConsumerMiddleware::TimeoutMiddleware

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(handler = nil, durable_name: nil, batch_size: nil) {|event| ... } ⇒ Consumer

Initialize a new Consumer instance.

Examples:

With proc handler

handler = ->(event) { puts "Received: #{event.type}" }
consumer = JetstreamBridge::Consumer.new(handler)

With block

consumer = JetstreamBridge::Consumer.new do |event|
  UserEventHandler.process(event)
end

With custom configuration

consumer = JetstreamBridge::Consumer.new(
  handler,
  durable_name: "my-consumer",
  batch_size: 10
)

Parameters:

  • handler (Proc, #call, nil) (defaults to: nil)

    Message handler that processes events. Must respond to #call(event).

  • durable_name (String, nil) (defaults to: nil)

    Optional durable consumer name override. Defaults to config.durable_name.

  • batch_size (Integer, nil) (defaults to: nil)

    Number of messages to fetch per batch. Defaults to DEFAULT_BATCH_SIZE (25).

Yields:

  • (event)

    Optional block as handler. Receives Models::Event object.

Raises:

  • (ArgumentError)

    If neither handler nor block provided

  • (ArgumentError)

    If destination_app not configured

  • (ConnectionError)

    If unable to connect to NATS



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 192

def initialize(handler = nil, durable_name: nil, batch_size: nil, &block)
  @handler = handler || block
  raise ArgumentError, 'handler or block required' unless @handler

  @batch_size    = Integer(batch_size || DEFAULT_BATCH_SIZE)
  @durable       = durable_name || JetstreamBridge.config.durable_name
  @processing_state = ProcessingState.new(idle_backoff: IDLE_SLEEP_SECS)
  @lifecycle_state = LifecycleState.new
  @connection_state = ConnectionState.new
  # Use existing connection (should already be established)
  @jts = Connection.jetstream
  raise ConnectionError, 'JetStream connection not available. Call JetstreamBridge.startup! first.' unless @jts

  @middleware_chain = MiddlewareChain.new

  ensure_destination_app_configured!

  @sub_mgr = SubscriptionManager.new(@jts, @durable, JetstreamBridge.config)
  @processor  = MessageProcessor.new(@jts, @handler, middleware_chain: @middleware_chain)
  @inbox_proc = InboxProcessor.new(@processor) if JetstreamBridge.config.use_inbox

  ensure_subscription!
  setup_signal_handlers
end

Instance Attribute Details

#batch_sizeInteger (readonly)

Returns Batch size for message fetching.

Returns:

  • (Integer)

    Batch size for message fetching



152
153
154
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 152

def batch_size
  @batch_size
end

#connection_stateConnectionState (readonly)

Returns Reconnection attempts and health check timing.

Returns:



160
161
162
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 160

def connection_state
  @connection_state
end

#durableString (readonly)

Returns Durable consumer name.

Returns:

  • (String)

    Durable consumer name



150
151
152
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 150

def durable
  @durable
end

#lifecycle_stateLifecycleState (readonly)

Returns Lifecycle flags and timing.

Returns:



158
159
160
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 158

def lifecycle_state
  @lifecycle_state
end

#middleware_chainMiddlewareChain (readonly)

Returns Middleware chain for processing.

Returns:



154
155
156
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 154

def middleware_chain
  @middleware_chain
end

#processing_stateProcessingState (readonly)

Returns Processing counters and backoff state.

Returns:



156
157
158
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 156

def processing_state
  @processing_state
end

Class Method Details

.register_consumer_for_signals(consumer) ⇒ void

This method returns an undefined value.

Register a consumer instance to receive OS signal notifications (INT, TERM).

Parameters:

  • consumer (Consumer)

    Consumer to register



66
67
68
69
70
71
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 66

def register_consumer_for_signals(consumer)
  signal_registry_mutex.synchronize do
    signal_consumers << consumer
    install_signal_handlers_once
  end
end

.reset_signal_handlers!void

This method returns an undefined value.

Clear all registered consumers and reset signal handler state.



84
85
86
87
88
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 84

def reset_signal_handlers!
  signal_registry_mutex.synchronize { signal_consumers.clear }
  @signal_handlers_installed = false
  @previous_signal_handlers = {}
end

.unregister_consumer_for_signals(consumer) ⇒ void

This method returns an undefined value.

Remove a consumer from the signal registry.

Parameters:

  • consumer (Consumer)

    Consumer to unregister



77
78
79
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 77

def unregister_consumer_for_signals(consumer)
  signal_registry_mutex.synchronize { signal_consumers.delete(consumer) }
end

Instance Method Details

#run!void

This method returns an undefined value.

Start the consumer and process messages in a blocking loop.

This method blocks the current thread and continuously fetches and processes messages until stop! is called or a signal is received (INT/TERM).

The consumer will:

  • Fetch messages in batches (configurable batch_size)

  • Process each message through the middleware chain

  • Handle errors and reconnection automatically

  • Implement exponential backoff during idle periods

  • Drain in-flight messages during graceful shutdown

Examples:

Basic usage

consumer = JetstreamBridge::Consumer.new { |event| process(event) }
consumer.run!  # Blocks here

In a Rake task

namespace :jetstream do
  task consume: :environment do
    consumer = JetstreamBridge.subscribe { |event| handle(event) }
    trap("TERM") { consumer.stop! }
    consumer.run!
  end
end

With error handling

consumer = JetstreamBridge::Consumer.new do |event|
  process(event)
rescue RecoverableError => e
  raise  # Let NATS retry
rescue UnrecoverableError => e
  logger.error(e)  # Log but don't raise (moves to DLQ if configured)
end
consumer.run!


285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 285

def run!
  Logging.info(
    "Consumer #{@durable} started (batch=#{@batch_size}, dest=#{JetstreamBridge.config.destination_subject})…",
    tag: 'JetstreamBridge::Consumer'
  )
  while @lifecycle_state.running
    # Check if signal was received and log it (safe from main loop)
    if @lifecycle_state.signal_received && !@lifecycle_state.signal_logged
      Logging.info("Received #{@lifecycle_state.signal_received}, stopping consumer...",
                   tag: 'JetstreamBridge::Consumer')
      @lifecycle_state.signal_logged = true
    end

    Logging.debug(
      "Fetching messages (iteration=#{@processing_state.iterations}, batch_size=#{@batch_size})...",
      tag: 'JetstreamBridge::Consumer'
    )
    processed = process_batch
    Logging.debug(
      "Processed #{processed} messages",
      tag: 'JetstreamBridge::Consumer'
    )
    idle_sleep(processed)

    @processing_state.iterations += 1

    # Periodic health checks every 10 minutes (600 seconds)
    perform_health_check_if_due
  end

  # Drain in-flight messages before exiting
  drain_inflight_messages if @lifecycle_state.shutdown_requested
  Logging.info("Consumer #{@durable} stopped gracefully", tag: 'JetstreamBridge::Consumer')
end

#stop!void

This method returns an undefined value.

Stop the consumer gracefully and drain in-flight messages.

This method signals the consumer to stop processing new messages and drain any messages that are currently being processed. It’s safe to call from signal handlers or other threads.

The consumer will:

  • Stop fetching new messages

  • Complete processing of in-flight messages

  • Drain pending messages (up to 5 batches)

  • Close the subscription cleanly

Examples:

In signal handler

consumer = JetstreamBridge::Consumer.new { |event| process(event) }
trap("TERM") { consumer.stop! }
consumer.run!

Manual control

consumer = JetstreamBridge::Consumer.new { |event| process(event) }
Thread.new { consumer.run! }
sleep 10
consumer.stop!  # Stop after 10 seconds


345
346
347
348
349
350
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 345

def stop!
  @lifecycle_state.stop!
  # Allow other consumers to continue receiving signals without stale references
  self.class.unregister_consumer_for_signals(self)
  Logging.info("Consumer #{@durable} shutdown requested", tag: 'JetstreamBridge::Consumer')
end

#use(middleware) ⇒ self

Add middleware to the processing chain.

Middleware is executed in the order it’s added. Each middleware must respond to #call(event, &block) and yield to continue the chain.

Examples:

Adding multiple middleware

consumer = JetstreamBridge.subscribe { |event| process(event) }
consumer.use(JetstreamBridge::Consumer::LoggingMiddleware.new)
consumer.use(JetstreamBridge::Consumer::MetricsMiddleware.new)
consumer.use(JetstreamBridge::Consumer::TimeoutMiddleware.new(timeout: 30))
consumer.run!

Custom middleware

class MyMiddleware
  def call(event)
    puts "Before: #{event.type}"
    yield
    puts "After: #{event.type}"
  end
end
consumer.use(MyMiddleware.new)

Parameters:

  • middleware (Object)

    Middleware that responds to #call(event, &block). Must yield to continue processing.

Returns:

  • (self)

    Returns self for method chaining



243
244
245
246
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 243

def use(middleware)
  @middleware_chain.use(middleware)
  self
end