Module: JetstreamBridge

Extended by:
Core::BridgeHelpers
Defined in:
lib/jetstream_bridge.rb,
lib/jetstream_bridge/errors.rb,
lib/jetstream_bridge/version.rb,
lib/jetstream_bridge/core/config.rb,
lib/jetstream_bridge/core/logging.rb,
lib/jetstream_bridge/models/event.rb,
lib/jetstream_bridge/test_helpers.rb,
lib/jetstream_bridge/core/duration.rb,
lib/jetstream_bridge/rails/railtie.rb,
lib/jetstream_bridge/models/subject.rb,
lib/jetstream_bridge/core/connection.rb,
lib/jetstream_bridge/topology/stream.rb,
lib/jetstream_bridge/core/model_utils.rb,
lib/jetstream_bridge/consumer/consumer.rb,
lib/jetstream_bridge/core/debug_helper.rb,
lib/jetstream_bridge/rails/integration.rb,
lib/jetstream_bridge/topology/topology.rb,
lib/jetstream_bridge/core/config_preset.rb,
lib/jetstream_bridge/models/inbox_event.rb,
lib/jetstream_bridge/consumer/middleware.rb,
lib/jetstream_bridge/core/bridge_helpers.rb,
lib/jetstream_bridge/core/retry_strategy.rb,
lib/jetstream_bridge/models/outbox_event.rb,
lib/jetstream_bridge/publisher/publisher.rb,
lib/jetstream_bridge/models/event_envelope.rb,
lib/jetstream_bridge/models/publish_result.rb,
lib/jetstream_bridge/test_helpers/fixtures.rb,
lib/jetstream_bridge/test_helpers/matchers.rb,
lib/jetstream_bridge/consumer/dlq_publisher.rb,
lib/jetstream_bridge/core/model_codec_setup.rb,
lib/jetstream_bridge/test_helpers/mock_nats.rb,
lib/jetstream_bridge/topology/overlap_guard.rb,
lib/jetstream_bridge/core/connection_factory.rb,
lib/jetstream_bridge/topology/subject_matcher.rb,
lib/jetstream_bridge/publisher/batch_publisher.rb,
lib/jetstream_bridge/consumer/message_processor.rb,
lib/jetstream_bridge/publisher/outbox_repository.rb,
lib/jetstream_bridge/consumer/inbox/inbox_message.rb,
lib/jetstream_bridge/consumer/subscription_manager.rb,
lib/jetstream_bridge/consumer/inbox/inbox_processor.rb,
lib/jetstream_bridge/consumer/inbox/inbox_repository.rb,
lib/jetstream_bridge/test_helpers/integration_helpers.rb,
lib/generators/jetstream_bridge/install/install_generator.rb,
lib/generators/jetstream_bridge/migrations/migrations_generator.rb,
lib/generators/jetstream_bridge/initializer/initializer_generator.rb,
lib/generators/jetstream_bridge/health_check/health_check_generator.rb

Overview

JetstreamBridge

Defined Under Namespace

Modules: ConfigPreset, ConsumerMiddleware, Core, DebugHelper, Duration, Generators, Logging, ModelCodecSetup, ModelUtils, Models, Rails, StreamSupport, SubjectMatcher, TestHelpers Classes: BackoffStrategy, BatchPublishError, BatchPublisher, Config, ConfigurationError, Connection, ConnectionError, ConnectionNotEstablishedError, Consumer, ConsumerError, DlqError, DlqPublishFailedError, DlqPublisher, Error, ExponentialBackoffStrategy, HandlerError, HealthCheckFailedError, InboxError, InboxEvent, InboxMessage, InboxProcessor, InboxRepository, InvalidSubjectError, LinearBackoffStrategy, MessageContext, MessageProcessor, MissingConfigurationError, OutboxError, OutboxEvent, OutboxRepository, OverlapGuard, PublishError, PublishFailedError, Publisher, PublisherRetryStrategy, Railtie, RetryExhausted, RetryStrategy, Stream, StreamCreationFailedError, StreamNotFoundError, SubjectOverlapError, SubscriptionManager, Topology, TopologyError

Constant Summary collapse

VERSION =
'4.4.0'

Class Method Summary collapse

Class Method Details

.configObject



71
72
73
# File 'lib/jetstream_bridge.rb', line 71

def config
  @config ||= Config.new
end

.configure(overrides = {}, **extra_overrides) {|Config| ... } ⇒ Config

Configure JetStream Bridge settings

This method sets configuration WITHOUT automatically establishing a connection. Connection must be established explicitly via startup! or will be established automatically on first use (publish/subscribe) or via Rails railtie initialization.

Examples:

Basic configuration

JetstreamBridge.configure do |config|
  config.nats_urls = "nats://localhost:4222"
  config.app_name = "my_app"
  config.destination_app = "worker"
end
JetstreamBridge.startup!  # Explicitly start connection

With hash overrides

JetstreamBridge.configure(env: 'production', app_name: 'my_app')

Parameters:

  • overrides (Hash) (defaults to: {})

    Configuration key-value pairs to set

Yields:

  • (Config)

    Configuration object for block-based configuration

Returns:

  • (Config)

    The configured instance



95
96
97
98
99
100
101
102
103
104
# File 'lib/jetstream_bridge.rb', line 95

def configure(overrides = {}, **extra_overrides)
  # Merge extra keyword arguments into overrides hash
  all_overrides = overrides.nil? ? extra_overrides : overrides.merge(extra_overrides)

  cfg = config
  all_overrides.each { |k, v| assign_config_option!(cfg, k, v) } unless all_overrides.empty?
  yield(cfg) if block_given?

  cfg
end

.configure_for(preset) {|Config| ... } ⇒ Config

Configure with a preset

This method applies a configuration preset. Connection must be established separately via startup! or via Rails railtie.

Examples:

JetstreamBridge.configure_for(:production) do |config|
  config.nats_urls = ENV["NATS_URLS"]
  config.app_name = "my_app"
  config.destination_app = "worker"
end
JetstreamBridge.startup!  # Explicitly start connection

Parameters:

  • preset (Symbol)

    Preset name (:development, :test, :production, etc.)

Yields:

  • (Config)

    Configuration object

Returns:

  • (Config)

    Configured instance



122
123
124
125
126
127
# File 'lib/jetstream_bridge.rb', line 122

def configure_for(preset)
  configure do |cfg|
    cfg.apply_preset(preset)
    yield(cfg) if block_given?
  end
end

.connect_and_ensure_stream!Object

Establishes a connection and ensures stream topology.

Returns:

  • (Object)

    JetStream context



202
203
204
205
206
207
# File 'lib/jetstream_bridge.rb', line 202

def connect_and_ensure_stream!
  Connection.connect!
  jts = Connection.jetstream
  Topology.ensure!(jts)
  jts
end

.connected?Boolean

Check if connected to NATS

Returns:

  • (Boolean)

    true if connected and healthy



288
289
290
291
292
# File 'lib/jetstream_bridge.rb', line 288

def connected?
  Connection.instance.connected?
rescue StandardError
  false
end

.ensure_topology!Object

Backwards-compatible alias for the previous method name



210
211
212
# File 'lib/jetstream_bridge.rb', line 210

def ensure_topology!
  connect_and_ensure_stream!
end

.health_check(skip_cache: false) ⇒ Hash

Active health check for monitoring and readiness probes

Performs actual operations to verify system health:

  • Checks NATS connection (active: calls account_info API)

  • Verifies stream exists and is accessible (active: queries stream info)

  • Tests NATS round-trip communication (active: RTT measurement)

Rate Limiting: To prevent abuse, uncached health checks are limited to once every 5 seconds. Cached results (within 30s TTL) bypass this limit via Connection.instance.connected?.

Parameters:

  • skip_cache (Boolean) (defaults to: false)

    Force fresh health check, bypass connection cache (rate limited)

Returns:

  • (Hash)

    Health status including NATS connection, stream, and version

Raises:



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/jetstream_bridge.rb', line 227

def health_check(skip_cache: false)
  # Rate limit uncached requests to prevent abuse (max 1 per 5 seconds)
  enforce_health_check_rate_limit! if skip_cache

  start_time = Time.now
  conn_instance = Connection.instance

  # Active check: calls @jts.account_info internally
  # Pass skip_cache to force fresh check if requested
  connected = conn_instance.connected?(skip_cache: skip_cache)
  connected_at = conn_instance.connected_at
  connection_state = conn_instance.state
  last_error = conn_instance.last_reconnect_error
  last_error_at = conn_instance.last_reconnect_error_at

  # Active check: queries actual stream from NATS server
  stream_info = connected ? fetch_stream_info : { exists: false, name: config.stream_name }

  # Active check: measure NATS round-trip time
  rtt_ms = measure_nats_rtt if connected

  health_check_duration_ms = ((Time.now - start_time) * 1000).round(2)

  {
    healthy: connected && stream_info&.fetch(:exists, false),
    connection: {
      state: connection_state,
      connected: connected,
      connected_at: connected_at&.iso8601,
      last_error: last_error&.message,
      last_error_at: last_error_at&.iso8601
    },
    stream: stream_info,
    performance: {
      nats_rtt_ms: rtt_ms,
      health_check_duration_ms: health_check_duration_ms
    },
    config: {
      env: config.env,
      app_name: config.app_name,
      destination_app: config.destination_app,
      use_outbox: config.use_outbox,
      use_inbox: config.use_inbox,
      use_dlq: config.use_dlq
    },
    version: JetstreamBridge::VERSION
  }
rescue StandardError => e
  {
    healthy: false,
    connection: {
      state: :failed,
      connected: false
    },
    error: "#{e.class}: #{e.message}"
  }
end

.publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil) ⇒ Models::PublishResult

Convenience method to publish events

Automatically establishes connection on first use if not already connected.

Supports three usage patterns:

  1. Structured parameters (recommended): JetstreamBridge.publish(resource_type: ‘user’, event_type: ‘created’, payload: { id: 1, name: ‘Ada’ })

  2. Simplified hash (infers resource_type from event_type): JetstreamBridge.publish(event_type: ‘user.created’, payload: { id: 1, name: ‘Ada’ })

  3. Complete envelope (advanced): JetstreamBridge.publish({ event_type: ‘created’, resource_type: ‘user’, payload: …, event_id: ‘…’ })

Examples:

Check result status

result = JetstreamBridge.publish(event_type: "user.created", payload: { id: 1 })
if result.success?
  puts "Published event #{result.event_id}"
else
  logger.error("Publish failed: #{result.error}")
end

Parameters:

  • event_or_hash (Hash, nil) (defaults to: nil)

    Event hash or first positional argument

  • resource_type (String, nil) (defaults to: nil)

    Resource type (e.g., ‘user’, ‘order’)

  • event_type (String, nil) (defaults to: nil)

    Event type (e.g., ‘created’, ‘updated’, ‘user.created’)

  • payload (Hash, nil) (defaults to: nil)

    Event payload data

  • subject (String, nil) (defaults to: nil)

    Optional subject override

  • options (Hash)

    Additional options (event_id, occurred_at, trace_id)

Returns:



331
332
333
334
335
336
# File 'lib/jetstream_bridge.rb', line 331

def publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil, **)
  connect_if_needed!
  publisher = Publisher.new
  publisher.publish(event_or_hash, resource_type: resource_type, event_type: event_type, payload: payload,
                                   subject: subject, **)
end

.publish!Models::PublishResult

Publish variant that raises on error

Examples:

JetstreamBridge.publish!(event_type: "user.created", payload: { id: 1 })
# Raises PublishError if publishing fails

Returns:

Raises:



347
348
349
350
351
352
353
354
355
# File 'lib/jetstream_bridge.rb', line 347

def publish!(...)
  result = publish(...)
  if result.failure?
    raise PublishError.new(result.error&.message, event_id: result.event_id,
                                                  subject: result.subject)
  end

  result
end

.publish_batch {|BatchPublisher| ... } ⇒ BatchPublisher::BatchResult

Batch publish multiple events efficiently

Examples:

results = JetstreamBridge.publish_batch do |batch|
  users.each do |user|
    batch.add(event_type: "user.created", payload: { id: user.id })
  end
end
puts "Success: #{results.successful_count}, Failed: #{results.failed_count}"

Yields:

Returns:



369
370
371
372
373
# File 'lib/jetstream_bridge.rb', line 369

def publish_batch
  batch = BatchPublisher.new
  yield(batch) if block_given?
  batch.publish
end

.reconnect!void

This method returns an undefined value.

Reconnect to NATS

Closes existing connection and establishes a new one. Useful for:

  • Forking web servers (Puma, Unicorn) after worker boot

  • Recovering from connection issues

  • Configuration changes that require reconnection

Examples:

In Puma configuration (config/puma.rb)

on_worker_boot do
  JetstreamBridge.reconnect! if defined?(JetstreamBridge)
end

Raises:



161
162
163
164
165
# File 'lib/jetstream_bridge.rb', line 161

def reconnect!
  Logging.info('Reconnecting to NATS...', tag: 'JetstreamBridge')
  shutdown! if @connection_initialized
  startup!
end

.reset!Object



129
130
131
132
# File 'lib/jetstream_bridge.rb', line 129

def reset!
  @config = nil
  @connection_initialized = false
end

.shutdown!void

This method returns an undefined value.

Gracefully shutdown the JetStream Bridge connection

Closes the NATS connection and cleans up resources. Should be called during application shutdown (e.g., in at_exit or signal handlers).



173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/jetstream_bridge.rb', line 173

def shutdown!
  return unless @connection_initialized

  begin
    nc = Connection.nc
    nc&.close if nc&.connected?
    Logging.info('JetStream Bridge shut down gracefully', tag: 'JetstreamBridge')
  rescue StandardError => e
    Logging.error("Error during shutdown: #{e.message}", tag: 'JetstreamBridge')
  ensure
    @connection_initialized = false
  end
end

.startup!void

This method returns an undefined value.

Initialize the JetStream Bridge connection and topology

This method can be called explicitly if needed. It’s idempotent and safe to call multiple times.



139
140
141
142
143
144
145
# File 'lib/jetstream_bridge.rb', line 139

def startup!
  return if @connection_initialized

  connect_and_ensure_stream!
  @connection_initialized = true
  Logging.info('JetStream Bridge started successfully', tag: 'JetstreamBridge')
end

.stream_infoHash

Get stream information for the configured stream

Returns:

  • (Hash)

    Stream information including subjects and message count



297
298
299
# File 'lib/jetstream_bridge.rb', line 297

def stream_info
  fetch_stream_info
end

.subscribe(handler = nil, run: false, durable_name: nil, batch_size: nil) {|event| ... } ⇒ Consumer, Thread

Convenience method to start consuming messages

Automatically establishes connection on first use if not already connected.

Supports two usage patterns:

  1. With a block (recommended): consumer = JetstreamBridge.subscribe do |event|

    puts "Received: #{event.type} on #{event.subject} (attempt #{event.deliveries})"
    

    end consumer.run!

  2. With auto-run (returns Thread): thread = JetstreamBridge.subscribe(run: true) do |event|

    puts "Received: #{event.type}"
    

    end thread.join # Wait for consumer to finish

  3. With a handler object: handler = ->(event) { puts event.type } consumer = JetstreamBridge.subscribe(handler) consumer.run!

Parameters:

  • handler (Proc, #call, nil) (defaults to: nil)

    Message handler (optional if block given)

  • run (Boolean) (defaults to: false)

    If true, automatically runs consumer in a background thread

  • durable_name (String, nil) (defaults to: nil)

    Optional durable consumer name override

  • batch_size (Integer, nil) (defaults to: nil)

    Optional batch size override

Yields:

  • (event)

    Yields Models::Event object to block

Returns:

  • (Consumer, Thread)

    Consumer instance or Thread if run: true

Raises:

  • (ArgumentError)


404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/jetstream_bridge.rb', line 404

def subscribe(handler = nil, run: false, durable_name: nil, batch_size: nil, &block)
  connect_if_needed!
  handler ||= block
  raise ArgumentError, 'Handler or block required' unless handler

  consumer = Consumer.new(handler, durable_name: durable_name, batch_size: batch_size)

  if run
    thread = Thread.new { consumer.run! }
    thread.abort_on_exception = true
    thread
  else
    consumer
  end
end

.use_dlq?Boolean

Returns:

  • (Boolean)


195
196
197
# File 'lib/jetstream_bridge.rb', line 195

def use_dlq?
  config.use_dlq
end

.use_inbox?Boolean

Returns:

  • (Boolean)


191
192
193
# File 'lib/jetstream_bridge.rb', line 191

def use_inbox?
  config.use_inbox
end

.use_outbox?Boolean

Returns:

  • (Boolean)


187
188
189
# File 'lib/jetstream_bridge.rb', line 187

def use_outbox?
  config.use_outbox
end