Module: JetstreamBridge
- Extended by:
- Core::BridgeHelpers
- Defined in:
- lib/jetstream_bridge.rb,
lib/jetstream_bridge/errors.rb,
lib/jetstream_bridge/version.rb,
lib/jetstream_bridge/core/config.rb,
lib/jetstream_bridge/core/logging.rb,
lib/jetstream_bridge/models/event.rb,
lib/jetstream_bridge/test_helpers.rb,
lib/jetstream_bridge/core/duration.rb,
lib/jetstream_bridge/rails/railtie.rb,
lib/jetstream_bridge/models/subject.rb,
lib/jetstream_bridge/core/connection.rb,
lib/jetstream_bridge/topology/stream.rb,
lib/jetstream_bridge/core/model_utils.rb,
lib/jetstream_bridge/consumer/consumer.rb,
lib/jetstream_bridge/core/debug_helper.rb,
lib/jetstream_bridge/rails/integration.rb,
lib/jetstream_bridge/topology/topology.rb,
lib/jetstream_bridge/core/config_preset.rb,
lib/jetstream_bridge/models/inbox_event.rb,
lib/jetstream_bridge/consumer/middleware.rb,
lib/jetstream_bridge/core/bridge_helpers.rb,
lib/jetstream_bridge/core/retry_strategy.rb,
lib/jetstream_bridge/models/outbox_event.rb,
lib/jetstream_bridge/publisher/publisher.rb,
lib/jetstream_bridge/models/event_envelope.rb,
lib/jetstream_bridge/models/publish_result.rb,
lib/jetstream_bridge/test_helpers/fixtures.rb,
lib/jetstream_bridge/test_helpers/matchers.rb,
lib/jetstream_bridge/consumer/dlq_publisher.rb,
lib/jetstream_bridge/core/model_codec_setup.rb,
lib/jetstream_bridge/test_helpers/mock_nats.rb,
lib/jetstream_bridge/topology/overlap_guard.rb,
lib/jetstream_bridge/core/connection_factory.rb,
lib/jetstream_bridge/topology/subject_matcher.rb,
lib/jetstream_bridge/publisher/batch_publisher.rb,
lib/jetstream_bridge/consumer/message_processor.rb,
lib/jetstream_bridge/publisher/outbox_repository.rb,
lib/jetstream_bridge/consumer/inbox/inbox_message.rb,
lib/jetstream_bridge/consumer/subscription_manager.rb,
lib/jetstream_bridge/consumer/inbox/inbox_processor.rb,
lib/jetstream_bridge/consumer/inbox/inbox_repository.rb,
lib/jetstream_bridge/test_helpers/integration_helpers.rb,
lib/generators/jetstream_bridge/install/install_generator.rb,
lib/generators/jetstream_bridge/migrations/migrations_generator.rb,
lib/generators/jetstream_bridge/initializer/initializer_generator.rb,
lib/generators/jetstream_bridge/health_check/health_check_generator.rb
Overview
JetstreamBridge
Defined Under Namespace
Modules: ConfigPreset, ConsumerMiddleware, Core, DebugHelper, Duration, Generators, Logging, ModelCodecSetup, ModelUtils, Models, Rails, StreamSupport, SubjectMatcher, TestHelpers Classes: BackoffStrategy, BatchPublishError, BatchPublisher, Config, ConfigurationError, Connection, ConnectionError, ConnectionNotEstablishedError, Consumer, ConsumerError, DlqError, DlqPublishFailedError, DlqPublisher, Error, ExponentialBackoffStrategy, HandlerError, HealthCheckFailedError, InboxError, InboxEvent, InboxMessage, InboxProcessor, InboxRepository, InvalidSubjectError, LinearBackoffStrategy, MessageContext, MessageProcessor, MissingConfigurationError, OutboxError, OutboxEvent, OutboxRepository, OverlapGuard, PublishError, PublishFailedError, Publisher, PublisherRetryStrategy, Railtie, RetryExhausted, RetryStrategy, Stream, StreamCreationFailedError, StreamNotFoundError, SubjectOverlapError, SubscriptionManager, Topology, TopologyError
Constant Summary collapse
- VERSION =
'4.4.0'
Class Method Summary collapse
- .config ⇒ Object
-
.configure(overrides = {}, **extra_overrides) {|Config| ... } ⇒ Config
Configure JetStream Bridge settings.
-
.configure_for(preset) {|Config| ... } ⇒ Config
Configure with a preset.
-
.connect_and_ensure_stream! ⇒ Object
Establishes a connection and ensures stream topology.
-
.connected? ⇒ Boolean
Check if connected to NATS.
-
.ensure_topology! ⇒ Object
Backwards-compatible alias for the previous method name.
-
.health_check(skip_cache: false) ⇒ Hash
Active health check for monitoring and readiness probes.
-
.publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil) ⇒ Models::PublishResult
Convenience method to publish events.
-
.publish! ⇒ Models::PublishResult
Publish variant that raises on error.
-
.publish_batch {|BatchPublisher| ... } ⇒ BatchPublisher::BatchResult
Batch publish multiple events efficiently.
-
.reconnect! ⇒ void
Reconnect to NATS.
- .reset! ⇒ Object
-
.shutdown! ⇒ void
Gracefully shutdown the JetStream Bridge connection.
-
.startup! ⇒ void
Initialize the JetStream Bridge connection and topology.
-
.stream_info ⇒ Hash
Get stream information for the configured stream.
-
.subscribe(handler = nil, run: false, durable_name: nil, batch_size: nil) {|event| ... } ⇒ Consumer, Thread
Convenience method to start consuming messages.
- .use_dlq? ⇒ Boolean
- .use_inbox? ⇒ Boolean
- .use_outbox? ⇒ Boolean
Class Method Details
.config ⇒ Object
71 72 73 |
# File 'lib/jetstream_bridge.rb', line 71 def config @config ||= Config.new end |
.configure(overrides = {}, **extra_overrides) {|Config| ... } ⇒ Config
Configure JetStream Bridge settings
This method sets configuration WITHOUT automatically establishing a connection. Connection must be established explicitly via startup! or will be established automatically on first use (publish/subscribe) or via Rails railtie initialization.
95 96 97 98 99 100 101 102 103 104 |
# File 'lib/jetstream_bridge.rb', line 95 def configure(overrides = {}, **extra_overrides) # Merge extra keyword arguments into overrides hash all_overrides = overrides.nil? ? extra_overrides : overrides.merge(extra_overrides) cfg = config all_overrides.each { |k, v| assign_config_option!(cfg, k, v) } unless all_overrides.empty? yield(cfg) if block_given? cfg end |
.configure_for(preset) {|Config| ... } ⇒ Config
Configure with a preset
This method applies a configuration preset. Connection must be established separately via startup! or via Rails railtie.
122 123 124 125 126 127 |
# File 'lib/jetstream_bridge.rb', line 122 def configure_for(preset) configure do |cfg| cfg.apply_preset(preset) yield(cfg) if block_given? end end |
.connect_and_ensure_stream! ⇒ Object
Establishes a connection and ensures stream topology.
202 203 204 205 206 207 |
# File 'lib/jetstream_bridge.rb', line 202 def connect_and_ensure_stream! Connection.connect! jts = Connection.jetstream Topology.ensure!(jts) jts end |
.connected? ⇒ Boolean
Check if connected to NATS
288 289 290 291 292 |
# File 'lib/jetstream_bridge.rb', line 288 def connected? Connection.instance.connected? rescue StandardError false end |
.ensure_topology! ⇒ Object
Backwards-compatible alias for the previous method name
210 211 212 |
# File 'lib/jetstream_bridge.rb', line 210 def ensure_topology! connect_and_ensure_stream! end |
.health_check(skip_cache: false) ⇒ Hash
Active health check for monitoring and readiness probes
Performs actual operations to verify system health:
-
Checks NATS connection (active: calls account_info API)
-
Verifies stream exists and is accessible (active: queries stream info)
-
Tests NATS round-trip communication (active: RTT measurement)
Rate Limiting: To prevent abuse, uncached health checks are limited to once every 5 seconds. Cached results (within 30s TTL) bypass this limit via Connection.instance.connected?.
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/jetstream_bridge.rb', line 227 def health_check(skip_cache: false) # Rate limit uncached requests to prevent abuse (max 1 per 5 seconds) enforce_health_check_rate_limit! if skip_cache start_time = Time.now conn_instance = Connection.instance # Active check: calls @jts.account_info internally # Pass skip_cache to force fresh check if requested connected = conn_instance.connected?(skip_cache: skip_cache) connected_at = conn_instance.connected_at connection_state = conn_instance.state last_error = conn_instance.last_reconnect_error last_error_at = conn_instance.last_reconnect_error_at # Active check: queries actual stream from NATS server stream_info = connected ? fetch_stream_info : { exists: false, name: config.stream_name } # Active check: measure NATS round-trip time rtt_ms = measure_nats_rtt if connected health_check_duration_ms = ((Time.now - start_time) * 1000).round(2) { healthy: connected && stream_info&.fetch(:exists, false), connection: { state: connection_state, connected: connected, connected_at: connected_at&.iso8601, last_error: last_error&., last_error_at: last_error_at&.iso8601 }, stream: stream_info, performance: { nats_rtt_ms: rtt_ms, health_check_duration_ms: health_check_duration_ms }, config: { env: config.env, app_name: config.app_name, destination_app: config.destination_app, use_outbox: config.use_outbox, use_inbox: config.use_inbox, use_dlq: config.use_dlq }, version: JetstreamBridge::VERSION } rescue StandardError => e { healthy: false, connection: { state: :failed, connected: false }, error: "#{e.class}: #{e.}" } end |
.publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil) ⇒ Models::PublishResult
Convenience method to publish events
Automatically establishes connection on first use if not already connected.
Supports three usage patterns:
-
Structured parameters (recommended): JetstreamBridge.publish(resource_type: ‘user’, event_type: ‘created’, payload: { id: 1, name: ‘Ada’ })
-
Simplified hash (infers resource_type from event_type): JetstreamBridge.publish(event_type: ‘user.created’, payload: { id: 1, name: ‘Ada’ })
-
Complete envelope (advanced): JetstreamBridge.publish({ event_type: ‘created’, resource_type: ‘user’, payload: …, event_id: ‘…’ })
331 332 333 334 335 336 |
# File 'lib/jetstream_bridge.rb', line 331 def publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil, **) connect_if_needed! publisher = Publisher.new publisher.publish(event_or_hash, resource_type: resource_type, event_type: event_type, payload: payload, subject: subject, **) end |
.publish! ⇒ Models::PublishResult
Publish variant that raises on error
347 348 349 350 351 352 353 354 355 |
# File 'lib/jetstream_bridge.rb', line 347 def publish!(...) result = publish(...) if result.failure? raise PublishError.new(result.error&., event_id: result.event_id, subject: result.subject) end result end |
.publish_batch {|BatchPublisher| ... } ⇒ BatchPublisher::BatchResult
Batch publish multiple events efficiently
369 370 371 372 373 |
# File 'lib/jetstream_bridge.rb', line 369 def publish_batch batch = BatchPublisher.new yield(batch) if block_given? batch.publish end |
.reconnect! ⇒ void
This method returns an undefined value.
Reconnect to NATS
Closes existing connection and establishes a new one. Useful for:
-
Forking web servers (Puma, Unicorn) after worker boot
-
Recovering from connection issues
-
Configuration changes that require reconnection
161 162 163 164 165 |
# File 'lib/jetstream_bridge.rb', line 161 def reconnect! Logging.info('Reconnecting to NATS...', tag: 'JetstreamBridge') shutdown! if @connection_initialized startup! end |
.reset! ⇒ Object
129 130 131 132 |
# File 'lib/jetstream_bridge.rb', line 129 def reset! @config = nil @connection_initialized = false end |
.shutdown! ⇒ void
This method returns an undefined value.
Gracefully shutdown the JetStream Bridge connection
Closes the NATS connection and cleans up resources. Should be called during application shutdown (e.g., in at_exit or signal handlers).
173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/jetstream_bridge.rb', line 173 def shutdown! return unless @connection_initialized begin nc = Connection.nc nc&.close if nc&.connected? Logging.info('JetStream Bridge shut down gracefully', tag: 'JetstreamBridge') rescue StandardError => e Logging.error("Error during shutdown: #{e.}", tag: 'JetstreamBridge') ensure @connection_initialized = false end end |
.startup! ⇒ void
This method returns an undefined value.
Initialize the JetStream Bridge connection and topology
This method can be called explicitly if needed. It’s idempotent and safe to call multiple times.
139 140 141 142 143 144 145 |
# File 'lib/jetstream_bridge.rb', line 139 def startup! return if @connection_initialized connect_and_ensure_stream! @connection_initialized = true Logging.info('JetStream Bridge started successfully', tag: 'JetstreamBridge') end |
.stream_info ⇒ Hash
Get stream information for the configured stream
297 298 299 |
# File 'lib/jetstream_bridge.rb', line 297 def stream_info fetch_stream_info end |
.subscribe(handler = nil, run: false, durable_name: nil, batch_size: nil) {|event| ... } ⇒ Consumer, Thread
Convenience method to start consuming messages
Automatically establishes connection on first use if not already connected.
Supports two usage patterns:
-
With a block (recommended): consumer = JetstreamBridge.subscribe do |event|
puts "Received: #{event.type} on #{event.subject} (attempt #{event.deliveries})"end consumer.run!
-
With auto-run (returns Thread): thread = JetstreamBridge.subscribe(run: true) do |event|
puts "Received: #{event.type}"end thread.join # Wait for consumer to finish
-
With a handler object: handler = ->(event) { puts event.type } consumer = JetstreamBridge.subscribe(handler) consumer.run!
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 |
# File 'lib/jetstream_bridge.rb', line 404 def subscribe(handler = nil, run: false, durable_name: nil, batch_size: nil, &block) connect_if_needed! handler ||= block raise ArgumentError, 'Handler or block required' unless handler consumer = Consumer.new(handler, durable_name: durable_name, batch_size: batch_size) if run thread = Thread.new { consumer.run! } thread.abort_on_exception = true thread else consumer end end |
.use_dlq? ⇒ Boolean
195 196 197 |
# File 'lib/jetstream_bridge.rb', line 195 def use_dlq? config.use_dlq end |
.use_inbox? ⇒ Boolean
191 192 193 |
# File 'lib/jetstream_bridge.rb', line 191 def use_inbox? config.use_inbox end |
.use_outbox? ⇒ Boolean
187 188 189 |
# File 'lib/jetstream_bridge.rb', line 187 def use_outbox? config.use_outbox end |