Class: JetstreamBridge::Consumer
- Inherits:
-
Object
- Object
- JetstreamBridge::Consumer
- Defined in:
- lib/jetstream_bridge/consumer/consumer.rb,
lib/jetstream_bridge/consumer/middleware.rb,
lib/jetstream_bridge/consumer/consumer_state.rb
Overview
Subscribes to destination subject and processes messages via a pull durable consumer.
The Consumer provides reliable message processing with features like:
-
Durable pull-based subscriptions with configurable batch sizes
-
Optional idempotent inbox pattern for exactly-once processing
-
Middleware support for cross-cutting concerns (logging, metrics, tracing)
-
Automatic reconnection and error recovery
-
Graceful shutdown with message draining
Defined Under Namespace
Classes: ConnectionState, LifecycleState, ProcessingState
Constant Summary collapse
- DEFAULT_BATCH_SIZE =
Default number of messages to fetch in each batch
25- FETCH_TIMEOUT_SECS =
Timeout for fetching messages from NATS (seconds)
5- IDLE_SLEEP_SECS =
Initial sleep duration when no messages available (seconds)
0.05
- MAX_IDLE_BACKOFF_SECS =
Maximum sleep duration during idle periods (seconds)
1.0- MiddlewareChain =
Alias middleware classes for easier access
ConsumerMiddleware::MiddlewareChain
- LoggingMiddleware =
ConsumerMiddleware::LoggingMiddleware
- ErrorHandlingMiddleware =
ConsumerMiddleware::ErrorHandlingMiddleware
- MetricsMiddleware =
ConsumerMiddleware::MetricsMiddleware
- TracingMiddleware =
ConsumerMiddleware::TracingMiddleware
- TimeoutMiddleware =
ConsumerMiddleware::TimeoutMiddleware
Instance Attribute Summary collapse
-
#batch_size ⇒ Integer
readonly
Batch size for message fetching.
-
#connection_state ⇒ ConnectionState
readonly
Reconnection attempts and health check timing.
-
#durable ⇒ String
readonly
Durable consumer name.
-
#lifecycle_state ⇒ LifecycleState
readonly
Lifecycle flags and timing.
-
#middleware_chain ⇒ MiddlewareChain
readonly
Middleware chain for processing.
-
#processing_state ⇒ ProcessingState
readonly
Processing counters and backoff state.
Class Method Summary collapse
-
.register_consumer_for_signals(consumer) ⇒ void
Register a consumer instance to receive OS signal notifications (INT, TERM).
-
.reset_signal_handlers! ⇒ void
Clear all registered consumers and reset signal handler state.
-
.unregister_consumer_for_signals(consumer) ⇒ void
Remove a consumer from the signal registry.
Instance Method Summary collapse
-
#initialize(handler = nil, durable_name: nil, batch_size: nil) {|event| ... } ⇒ Consumer
constructor
Initialize a new Consumer instance.
-
#run! ⇒ void
Start the consumer and process messages in a blocking loop.
-
#stop! ⇒ void
Stop the consumer gracefully and drain in-flight messages.
-
#use(middleware) ⇒ self
Add middleware to the processing chain.
Constructor Details
#initialize(handler = nil, durable_name: nil, batch_size: nil) {|event| ... } ⇒ Consumer
Initialize a new Consumer instance.
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 192 def initialize(handler = nil, durable_name: nil, batch_size: nil, &block) @handler = handler || block raise ArgumentError, 'handler or block required' unless @handler @batch_size = Integer(batch_size || DEFAULT_BATCH_SIZE) @durable = durable_name || JetstreamBridge.config.durable_name @processing_state = ProcessingState.new(idle_backoff: IDLE_SLEEP_SECS) @lifecycle_state = LifecycleState.new @connection_state = ConnectionState.new # Use existing connection (should already be established) @jts = Connection.jetstream raise ConnectionError, 'JetStream connection not available. Call JetstreamBridge.startup! first.' unless @jts @middleware_chain = MiddlewareChain.new ensure_destination_app_configured! @sub_mgr = SubscriptionManager.new(@jts, @durable, JetstreamBridge.config) @processor = MessageProcessor.new(@jts, @handler, middleware_chain: @middleware_chain) @inbox_proc = InboxProcessor.new(@processor) if JetstreamBridge.config.use_inbox ensure_subscription! setup_signal_handlers end |
Instance Attribute Details
#batch_size ⇒ Integer (readonly)
Returns Batch size for message fetching.
152 153 154 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 152 def batch_size @batch_size end |
#connection_state ⇒ ConnectionState (readonly)
Returns Reconnection attempts and health check timing.
160 161 162 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 160 def connection_state @connection_state end |
#durable ⇒ String (readonly)
Returns Durable consumer name.
150 151 152 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 150 def durable @durable end |
#lifecycle_state ⇒ LifecycleState (readonly)
Returns Lifecycle flags and timing.
158 159 160 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 158 def lifecycle_state @lifecycle_state end |
#middleware_chain ⇒ MiddlewareChain (readonly)
Returns Middleware chain for processing.
154 155 156 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 154 def middleware_chain @middleware_chain end |
#processing_state ⇒ ProcessingState (readonly)
Returns Processing counters and backoff state.
156 157 158 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 156 def processing_state @processing_state end |
Class Method Details
.register_consumer_for_signals(consumer) ⇒ void
This method returns an undefined value.
Register a consumer instance to receive OS signal notifications (INT, TERM).
66 67 68 69 70 71 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 66 def register_consumer_for_signals(consumer) signal_registry_mutex.synchronize do signal_consumers << consumer install_signal_handlers_once end end |
.reset_signal_handlers! ⇒ void
This method returns an undefined value.
Clear all registered consumers and reset signal handler state.
84 85 86 87 88 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 84 def reset_signal_handlers! signal_registry_mutex.synchronize { signal_consumers.clear } @signal_handlers_installed = false @previous_signal_handlers = {} end |
.unregister_consumer_for_signals(consumer) ⇒ void
This method returns an undefined value.
Remove a consumer from the signal registry.
77 78 79 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 77 def unregister_consumer_for_signals(consumer) signal_registry_mutex.synchronize { signal_consumers.delete(consumer) } end |
Instance Method Details
#run! ⇒ void
This method returns an undefined value.
Start the consumer and process messages in a blocking loop.
This method blocks the current thread and continuously fetches and processes messages until stop! is called or a signal is received (INT/TERM).
The consumer will:
-
Fetch messages in batches (configurable batch_size)
-
Process each message through the middleware chain
-
Handle errors and reconnection automatically
-
Implement exponential backoff during idle periods
-
Drain in-flight messages during graceful shutdown
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 285 def run! Logging.info( "Consumer #{@durable} started (batch=#{@batch_size}, dest=#{JetstreamBridge.config.destination_subject})…", tag: 'JetstreamBridge::Consumer' ) while @lifecycle_state.running # Check if signal was received and log it (safe from main loop) if @lifecycle_state.signal_received && !@lifecycle_state.signal_logged Logging.info("Received #{@lifecycle_state.signal_received}, stopping consumer...", tag: 'JetstreamBridge::Consumer') @lifecycle_state.signal_logged = true end Logging.debug( "Fetching messages (iteration=#{@processing_state.iterations}, batch_size=#{@batch_size})...", tag: 'JetstreamBridge::Consumer' ) processed = process_batch Logging.debug( "Processed #{processed} messages", tag: 'JetstreamBridge::Consumer' ) idle_sleep(processed) @processing_state.iterations += 1 # Periodic health checks every 10 minutes (600 seconds) perform_health_check_if_due end # Drain in-flight messages before exiting if @lifecycle_state.shutdown_requested Logging.info("Consumer #{@durable} stopped gracefully", tag: 'JetstreamBridge::Consumer') end |
#stop! ⇒ void
This method returns an undefined value.
Stop the consumer gracefully and drain in-flight messages.
This method signals the consumer to stop processing new messages and drain any messages that are currently being processed. It’s safe to call from signal handlers or other threads.
The consumer will:
-
Stop fetching new messages
-
Complete processing of in-flight messages
-
Drain pending messages (up to 5 batches)
-
Close the subscription cleanly
345 346 347 348 349 350 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 345 def stop! @lifecycle_state.stop! # Allow other consumers to continue receiving signals without stale references self.class.unregister_consumer_for_signals(self) Logging.info("Consumer #{@durable} shutdown requested", tag: 'JetstreamBridge::Consumer') end |
#use(middleware) ⇒ self
Add middleware to the processing chain.
Middleware is executed in the order it’s added. Each middleware must respond to #call(event, &block) and yield to continue the chain.
243 244 245 246 |
# File 'lib/jetstream_bridge/consumer/consumer.rb', line 243 def use(middleware) @middleware_chain.use(middleware) self end |