Class: JetstreamBridge::Consumer
- Inherits:
-
Object
- Object
- JetstreamBridge::Consumer
- Defined in:
- lib/jetstream_bridge/consumer/consumer.rb,
lib/jetstream_bridge/consumer/middleware.rb
Overview
Subscribes to destination subject and processes messages via a pull durable consumer.
The Consumer provides reliable message processing with features like:
-
Durable pull-based subscriptions with configurable batch sizes
-
Optional idempotent inbox pattern for exactly-once processing
-
Middleware support for cross-cutting concerns (logging, metrics, tracing)
-
Automatic reconnection and error recovery
-
Graceful shutdown with message draining
Constant Summary collapse
- DEFAULT_BATCH_SIZE =
Default number of messages to fetch in each batch
25- FETCH_TIMEOUT_SECS =
Timeout for fetching messages from NATS (seconds)
5- IDLE_SLEEP_SECS =
Initial sleep duration when no messages available (seconds)
0.05
- MAX_IDLE_BACKOFF_SECS =
Maximum sleep duration during idle periods (seconds)
1.0- MiddlewareChain =
Alias middleware classes for easier access
ConsumerMiddleware::MiddlewareChain
- LoggingMiddleware =
ConsumerMiddleware::LoggingMiddleware
- ErrorHandlingMiddleware =
ConsumerMiddleware::ErrorHandlingMiddleware
- MetricsMiddleware =
ConsumerMiddleware::MetricsMiddleware
- TracingMiddleware =
ConsumerMiddleware::TracingMiddleware
- TimeoutMiddleware =
ConsumerMiddleware::TimeoutMiddleware
Instance Attribute Summary collapse
-
#batch_size ⇒ Integer
readonly
Batch size for message fetching.
-
#durable ⇒ String
readonly
Durable consumer name.
-
#middleware_chain ⇒ MiddlewareChain
readonly
Middleware chain for processing.
Instance Method Summary collapse
-
#initialize(handler = nil, durable_name: nil, batch_size: nil) {|event| ... } ⇒ Consumer
constructor
Initialize a new Consumer instance.
-
#run! ⇒ void
Start the consumer and process messages in a blocking loop.
-
#stop! ⇒ void
Stop the consumer gracefully and drain in-flight messages.
-
#use(middleware) ⇒ self
Add middleware to the processing chain.
Constructor Details
#initialize(handler = nil, durable_name: nil, batch_size: nil) {|event| ... } ⇒ Consumer
Initialize a new Consumer instance.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 97 def initialize(handler = nil, durable_name: nil, batch_size: nil, &block) @handler = handler || block raise ArgumentError, 'handler or block required' unless @handler @batch_size = Integer(batch_size || DEFAULT_BATCH_SIZE) @durable = durable_name || JetstreamBridge.config.durable_name @idle_backoff = IDLE_SLEEP_SECS @reconnect_attempts = 0 @running = true @shutdown_requested = false @start_time = Time.now @iterations = 0 @last_health_check = Time.now # Use existing connection (should already be established) @jts = Connection.jetstream raise ConnectionError, 'JetStream connection not available. Call JetstreamBridge.startup! first.' unless @jts @middleware_chain = MiddlewareChain.new ensure_destination_app_configured! @sub_mgr = SubscriptionManager.new(@jts, @durable, JetstreamBridge.config) @processor = MessageProcessor.new(@jts, @handler, middleware_chain: @middleware_chain) @inbox_proc = InboxProcessor.new(@processor) if JetstreamBridge.config.use_inbox ensure_subscription! setup_signal_handlers end |
Instance Attribute Details
#batch_size ⇒ Integer (readonly)
63 64 65 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 63 def batch_size @batch_size end |
#durable ⇒ String (readonly)
61 62 63 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 61 def durable @durable end |
#middleware_chain ⇒ MiddlewareChain (readonly)
65 66 67 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 65 def middleware_chain @middleware_chain end |
Instance Method Details
#run! ⇒ void
This method returns an undefined value.
Start the consumer and process messages in a blocking loop.
This method blocks the current thread and continuously fetches and processes messages until stop! is called or a signal is received (INT/TERM).
The consumer will:
-
Fetch messages in batches (configurable batch_size)
-
Process each message through the middleware chain
-
Handle errors and reconnection automatically
-
Implement exponential backoff during idle periods
-
Drain in-flight messages during graceful shutdown
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 194 def run! Logging.info( "Consumer #{@durable} started (batch=#{@batch_size}, dest=#{JetstreamBridge.config.destination_subject})…", tag: 'JetstreamBridge::Consumer' ) while @running processed = process_batch idle_sleep(processed) @iterations += 1 # Periodic health checks every 10 minutes (600 seconds) perform_health_check_if_due end # Drain in-flight messages before exiting if @shutdown_requested Logging.info("Consumer #{@durable} stopped gracefully", tag: 'JetstreamBridge::Consumer') end |
#stop! ⇒ void
This method returns an undefined value.
Stop the consumer gracefully and drain in-flight messages.
This method signals the consumer to stop processing new messages and drain any messages that are currently being processed. It’s safe to call from signal handlers or other threads.
The consumer will:
-
Stop fetching new messages
-
Complete processing of in-flight messages
-
Drain pending messages (up to 5 batches)
-
Close the subscription cleanly
239 240 241 242 243 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 239 def stop! @shutdown_requested = true @running = false Logging.info("Consumer #{@durable} shutdown requested", tag: 'JetstreamBridge::Consumer') end |
#use(middleware) ⇒ self
Add middleware to the processing chain.
Middleware is executed in the order it’s added. Each middleware must respond to #call(event, &block) and yield to continue the chain.
152 153 154 155 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 152 def use(middleware) @middleware_chain.use(middleware) self end |