Module: JetstreamBridge
- Extended by:
- Core::BridgeHelpers
- Defined in:
- lib/jetstream_bridge.rb,
lib/jetstream_bridge/errors.rb,
lib/jetstream_bridge/version.rb,
lib/jetstream_bridge/core/config.rb,
lib/jetstream_bridge/provisioner.rb,
lib/jetstream_bridge/core/logging.rb,
lib/jetstream_bridge/models/event.rb,
lib/jetstream_bridge/test_helpers.rb,
lib/jetstream_bridge/core/duration.rb,
lib/jetstream_bridge/rails/railtie.rb,
lib/jetstream_bridge/config_helpers.rb,
lib/jetstream_bridge/models/subject.rb,
lib/jetstream_bridge/core/connection.rb,
lib/jetstream_bridge/topology/stream.rb,
lib/jetstream_bridge/core/model_utils.rb,
lib/jetstream_bridge/consumer/consumer.rb,
lib/jetstream_bridge/core/debug_helper.rb,
lib/jetstream_bridge/rails/integration.rb,
lib/jetstream_bridge/topology/topology.rb,
lib/jetstream_bridge/core/config_preset.rb,
lib/jetstream_bridge/models/inbox_event.rb,
lib/jetstream_bridge/consumer/middleware.rb,
lib/jetstream_bridge/core/bridge_helpers.rb,
lib/jetstream_bridge/core/retry_strategy.rb,
lib/jetstream_bridge/models/outbox_event.rb,
lib/jetstream_bridge/publisher/publisher.rb,
lib/jetstream_bridge/models/event_envelope.rb,
lib/jetstream_bridge/models/publish_result.rb,
lib/jetstream_bridge/test_helpers/fixtures.rb,
lib/jetstream_bridge/test_helpers/matchers.rb,
lib/jetstream_bridge/consumer/dlq_publisher.rb,
lib/jetstream_bridge/core/model_codec_setup.rb,
lib/jetstream_bridge/test_helpers/mock_nats.rb,
lib/jetstream_bridge/topology/overlap_guard.rb,
lib/jetstream_bridge/consumer/consumer_state.rb,
lib/jetstream_bridge/core/connection_factory.rb,
lib/jetstream_bridge/config_helpers/lifecycle.rb,
lib/jetstream_bridge/topology/subject_matcher.rb,
lib/jetstream_bridge/publisher/batch_publisher.rb,
lib/jetstream_bridge/consumer/message_processor.rb,
lib/jetstream_bridge/core/consumer_mode_resolver.rb,
lib/jetstream_bridge/publisher/outbox_repository.rb,
lib/jetstream_bridge/consumer/inbox/inbox_message.rb,
lib/jetstream_bridge/consumer/subscription_manager.rb,
lib/jetstream_bridge/consumer/inbox/inbox_processor.rb,
lib/jetstream_bridge/consumer/inbox/inbox_repository.rb,
lib/jetstream_bridge/test_helpers/integration_helpers.rb,
lib/jetstream_bridge/consumer/pull_subscription_builder.rb,
lib/generators/jetstream_bridge/install/install_generator.rb,
lib/generators/jetstream_bridge/migrations/migrations_generator.rb,
lib/generators/jetstream_bridge/initializer/initializer_generator.rb,
lib/generators/jetstream_bridge/health_check/health_check_generator.rb
Overview
JetstreamBridge
Defined Under Namespace
Modules: ConfigHelpers, ConfigPreset, ConsumerMiddleware, ConsumerModeResolver, Core, DebugHelper, Duration, Generators, Logging, ModelCodecSetup, ModelUtils, Models, Rails, StreamSupport, SubjectMatcher, TestHelpers Classes: BackoffStrategy, BatchPublishError, BatchPublisher, Config, ConfigurationError, Connection, ConnectionError, ConnectionNotEstablishedError, Consumer, ConsumerError, ConsumerProvisioningError, DlqError, DlqPublishFailedError, DlqPublisher, Error, ExponentialBackoffStrategy, HandlerError, HealthCheckFailedError, InboxError, InboxEvent, InboxMessage, InboxProcessor, InboxRepository, InvalidSubjectError, LinearBackoffStrategy, MessageContext, MessageProcessor, MissingConfigurationError, OutboxError, OutboxEvent, OutboxRepository, OverlapGuard, Provisioner, PublishError, PublishFailedError, Publisher, PublisherRetryStrategy, PullSubscriptionBuilder, Railtie, RetryExhausted, RetryStrategy, Stream, StreamCreationFailedError, StreamNotFoundError, SubjectOverlapError, SubscriptionManager, Topology, TopologyError
Constant Summary collapse
- VERSION =
'7.1.4'
Class Method Summary collapse
-
.config ⇒ Config
Returns the current configuration, creating a default instance if needed.
-
.configure(overrides = {}, **extra_overrides) {|Config| ... } ⇒ Config
Configure JetStream Bridge settings.
-
.configure_for(preset) {|Config| ... } ⇒ Config
Configure with a preset.
-
.connect_and_provision! ⇒ Object
Establishes a connection and provisions stream topology.
-
.connected? ⇒ Boolean
Check if connected to NATS.
-
.health_check(skip_cache: false) ⇒ Hash
Active health check for monitoring and readiness probes.
-
.provision!(provision_consumer: true) ⇒ Object
Provision stream/consumer using management credentials (out of band from runtime).
-
.publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil) ⇒ Models::PublishResult
Convenience method to publish events.
-
.publish! ⇒ Models::PublishResult
Publish variant that raises on error.
-
.publish_batch {|BatchPublisher| ... } ⇒ BatchPublisher::BatchResult
Batch publish multiple events efficiently.
-
.reconnect! ⇒ void
Reconnect to NATS.
-
.reset! ⇒ void
Reset all configuration and connection state.
-
.shutdown! ⇒ void
Gracefully shutdown the JetStream Bridge connection.
-
.startup! ⇒ void
Initialize the JetStream Bridge connection and topology.
-
.stream_info ⇒ Hash
Get stream information for the configured stream.
-
.subscribe(handler = nil, run: false, durable_name: nil, batch_size: nil) {|event| ... } ⇒ Consumer, Thread
Convenience method to start consuming messages.
-
.use_dlq? ⇒ Boolean
Whether the dead letter queue is enabled.
-
.use_inbox? ⇒ Boolean
Whether the idempotent inbox pattern is enabled.
-
.use_outbox? ⇒ Boolean
Whether the transactional outbox pattern is enabled.
Class Method Details
.config ⇒ Config
Returns the current configuration, creating a default instance if needed.
74 75 76 |
# File 'lib/jetstream_bridge.rb', line 74 def config @config ||= Config.new end |
.configure(overrides = {}, **extra_overrides) {|Config| ... } ⇒ Config
Configure JetStream Bridge settings
This method sets configuration WITHOUT automatically establishing a connection. Connection must be established explicitly via startup! or will be established automatically on first use (publish/subscribe) or via Rails railtie initialization.
99 100 101 102 103 104 105 106 107 108 |
# File 'lib/jetstream_bridge.rb', line 99 def configure(overrides = {}, **extra_overrides) # Merge extra keyword arguments into overrides hash all_overrides = overrides.nil? ? extra_overrides : overrides.merge(extra_overrides) cfg = config all_overrides.each { |k, v| assign_config_option!(cfg, k, v) } unless all_overrides.empty? yield(cfg) if block_given? cfg end |
.configure_for(preset) {|Config| ... } ⇒ Config
Configure with a preset
This method applies a configuration preset. Connection must be established separately via startup! or via Rails railtie.
126 127 128 129 130 131 |
# File 'lib/jetstream_bridge.rb', line 126 def configure_for(preset) configure do |cfg| cfg.apply_preset(preset) yield(cfg) if block_given? end end |
.connect_and_provision! ⇒ Object
Establishes a connection and provisions stream topology.
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/jetstream_bridge.rb', line 225 def connect_and_provision! config.validate! provision = config.auto_provision Connection.connect!(verify_js: provision) jts = Connection.jetstream raise ConnectionNotEstablishedError, 'JetStream connection not available' unless jts if provision Provisioner.new(config: config).provision_stream!(jts: jts) else Logging.info( 'auto_provision=false: skipping stream provisioning and JetStream account_info. ' \ 'Run `bundle exec rake jetstream_bridge:provision` with admin credentials to create/update topology.', tag: 'JetstreamBridge' ) end jts end |
.connected? ⇒ Boolean
Check if connected to NATS
302 303 304 305 306 |
# File 'lib/jetstream_bridge.rb', line 302 def connected? Connection.instance.connected? rescue StandardError false end |
.health_check(skip_cache: false) ⇒ Hash
Active health check for monitoring and readiness probes
Performs actual operations to verify system health:
-
Checks NATS connection (active: calls account_info API)
-
Verifies stream exists and is accessible (active: queries stream info)
-
Tests NATS round-trip communication (active: RTT measurement)
Rate Limiting: To prevent abuse, uncached health checks are limited to once every 5 seconds. Cached results (within 30s TTL) bypass this limit via Connection.instance.connected?.
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 |
# File 'lib/jetstream_bridge.rb', line 267 def health_check(skip_cache: false) # Rate limit uncached requests to prevent abuse (max 1 per 5 seconds) enforce_health_check_rate_limit! if skip_cache start_time = Time.now conn_status = connection_snapshot(Connection.instance, skip_cache: skip_cache) stream_info = stream_status(conn_status[:connected]) rtt_ms = measure_nats_rtt if conn_status[:connected] health_check_duration_ms = elapsed_ms(start_time) { healthy: health_flag(conn_status[:connected], stream_info), connection: connection_payload(conn_status), stream: stream_info, performance: { nats_rtt_ms: rtt_ms, health_check_duration_ms: health_check_duration_ms }, config: config_summary, version: JetstreamBridge::VERSION } rescue StandardError => e { healthy: false, connection: { state: :failed, connected: false }, error: "#{e.class}: #{e.message}" } end |
.provision!(provision_consumer: true) ⇒ Object
Provision stream/consumer using management credentials (out of band from runtime).
249 250 251 252 |
# File 'lib/jetstream_bridge.rb', line 249 def provision!(provision_consumer: true) config.validate! Provisioner.new(config: config).provision!(provision_consumer: provision_consumer) end |
.publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil) ⇒ Models::PublishResult
Convenience method to publish events
Automatically establishes connection on first use if not already connected.
Supports three usage patterns:
-
Structured parameters (recommended): JetstreamBridge.publish(resource_type: ‘user’, event_type: ‘created’, payload: { id: 1, name: ‘Ada’ })
-
Simplified hash (infers resource_type from event_type): JetstreamBridge.publish(event_type: ‘user.created’, payload: { id: 1, name: ‘Ada’ })
-
Complete envelope (advanced): JetstreamBridge.publish({ event_type: ‘created’, resource_type: ‘user’, payload: …, event_id: ‘…’ })
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 |
# File 'lib/jetstream_bridge.rb', line 346 def publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil, **) retried_after_reconnect = false begin connect_if_needed! publisher = Publisher.new publisher.publish(event_or_hash, resource_type: resource_type, event_type: event_type, payload: payload, subject: subject, **) rescue ConnectionNotEstablishedError => e raise if retried_after_reconnect retried_after_reconnect = true Logging.warn("JetStream context unavailable during publish: #{e.message}. Reconnecting and retrying once.", tag: 'JetstreamBridge') reconnect! retry end end |
.publish! ⇒ Models::PublishResult
Publish variant that raises on error
373 374 375 376 377 378 379 380 381 |
# File 'lib/jetstream_bridge.rb', line 373 def publish!(...) result = publish(...) if result.failure? raise PublishError.new(result.error&., event_id: result.event_id, subject: result.subject) end result end |
.publish_batch {|BatchPublisher| ... } ⇒ BatchPublisher::BatchResult
Batch publish multiple events efficiently
395 396 397 398 399 |
# File 'lib/jetstream_bridge.rb', line 395 def publish_batch batch = BatchPublisher.new yield(batch) if block_given? batch.publish end |
.reconnect! ⇒ void
This method returns an undefined value.
Reconnect to NATS
Closes existing connection and establishes a new one. Useful for:
-
Forking web servers (Puma, Unicorn) after worker boot
-
Recovering from connection issues
-
Configuration changes that require reconnection
175 176 177 178 179 |
# File 'lib/jetstream_bridge.rb', line 175 def reconnect! Logging.info('Reconnecting to NATS...', tag: 'JetstreamBridge') shutdown! if @connection_initialized startup! end |
.reset! ⇒ void
This method returns an undefined value.
Reset all configuration and connection state.
Clears the config singleton and marks the connection as uninitialized. Primarily used in tests to restore a clean slate between examples.
139 140 141 142 143 |
# File 'lib/jetstream_bridge.rb', line 139 def reset! @config = nil @connection_initialized = false Consumer.reset_signal_handlers! if defined?(Consumer) end |
.shutdown! ⇒ void
This method returns an undefined value.
Gracefully shutdown the JetStream Bridge connection
Closes the NATS connection and cleans up resources. Should be called during application shutdown (e.g., in at_exit or signal handlers).
187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/jetstream_bridge.rb', line 187 def shutdown! return unless @connection_initialized begin nc = Connection.nc nc&.close if nc&.connected? Logging.info('JetStream Bridge shut down gracefully', tag: 'JetstreamBridge') rescue StandardError => e Logging.error("Error during shutdown: #{e.message}", tag: 'JetstreamBridge') ensure @connection_initialized = false end end |
.startup! ⇒ void
This method returns an undefined value.
Initialize the JetStream Bridge connection and topology
This method can be called explicitly if needed. It’s idempotent and safe to call multiple times.
152 153 154 155 156 157 158 159 |
# File 'lib/jetstream_bridge.rb', line 152 def startup! return if @connection_initialized config.validate! connect_and_provision! @connection_initialized = true Logging.info('JetStream Bridge started successfully', tag: 'JetstreamBridge') end |
.stream_info ⇒ Hash
Get stream information for the configured stream
311 312 313 |
# File 'lib/jetstream_bridge.rb', line 311 def stream_info fetch_stream_info end |
.subscribe(handler = nil, run: false, durable_name: nil, batch_size: nil) {|event| ... } ⇒ Consumer, Thread
Convenience method to start consuming messages
Automatically establishes connection on first use if not already connected.
Supports two usage patterns:
-
With a block (recommended): consumer = JetstreamBridge.subscribe do |event|
puts "Received: #{event.type} on #{event.subject} (attempt #{event.deliveries})"end consumer.run!
-
With auto-run (returns Thread): thread = JetstreamBridge.subscribe(run: true) do |event|
puts "Received: #{event.type}"end thread.join # Wait for consumer to finish
-
With a handler object: handler = ->(event) { puts event.type } consumer = JetstreamBridge.subscribe(handler) consumer.run!
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 |
# File 'lib/jetstream_bridge.rb', line 430 def subscribe(handler = nil, run: false, durable_name: nil, batch_size: nil, &block) connect_if_needed! handler ||= block raise ArgumentError, 'Handler or block required' unless handler consumer = Consumer.new(handler, durable_name: durable_name, batch_size: batch_size) if run thread = Thread.new { consumer.run! } thread.abort_on_exception = true thread else consumer end end |
.use_dlq? ⇒ Boolean
Whether the dead letter queue is enabled.
218 219 220 |
# File 'lib/jetstream_bridge.rb', line 218 def use_dlq? config.use_dlq end |
.use_inbox? ⇒ Boolean
Whether the idempotent inbox pattern is enabled.
211 212 213 |
# File 'lib/jetstream_bridge.rb', line 211 def use_inbox? config.use_inbox end |
.use_outbox? ⇒ Boolean
Whether the transactional outbox pattern is enabled.
204 205 206 |
# File 'lib/jetstream_bridge.rb', line 204 def use_outbox? config.use_outbox end |