Module: JetstreamBridge

Extended by:
Core::BridgeHelpers
Defined in:
lib/jetstream_bridge.rb,
lib/jetstream_bridge/errors.rb,
lib/jetstream_bridge/version.rb,
lib/jetstream_bridge/core/config.rb,
lib/jetstream_bridge/provisioner.rb,
lib/jetstream_bridge/core/logging.rb,
lib/jetstream_bridge/models/event.rb,
lib/jetstream_bridge/test_helpers.rb,
lib/jetstream_bridge/core/duration.rb,
lib/jetstream_bridge/rails/railtie.rb,
lib/jetstream_bridge/config_helpers.rb,
lib/jetstream_bridge/models/subject.rb,
lib/jetstream_bridge/core/connection.rb,
lib/jetstream_bridge/topology/stream.rb,
lib/jetstream_bridge/core/model_utils.rb,
lib/jetstream_bridge/consumer/consumer.rb,
lib/jetstream_bridge/core/debug_helper.rb,
lib/jetstream_bridge/rails/integration.rb,
lib/jetstream_bridge/topology/topology.rb,
lib/jetstream_bridge/core/config_preset.rb,
lib/jetstream_bridge/models/inbox_event.rb,
lib/jetstream_bridge/consumer/middleware.rb,
lib/jetstream_bridge/core/bridge_helpers.rb,
lib/jetstream_bridge/core/retry_strategy.rb,
lib/jetstream_bridge/models/outbox_event.rb,
lib/jetstream_bridge/publisher/publisher.rb,
lib/jetstream_bridge/models/event_envelope.rb,
lib/jetstream_bridge/models/publish_result.rb,
lib/jetstream_bridge/test_helpers/fixtures.rb,
lib/jetstream_bridge/test_helpers/matchers.rb,
lib/jetstream_bridge/consumer/dlq_publisher.rb,
lib/jetstream_bridge/core/model_codec_setup.rb,
lib/jetstream_bridge/test_helpers/mock_nats.rb,
lib/jetstream_bridge/topology/overlap_guard.rb,
lib/jetstream_bridge/consumer/consumer_state.rb,
lib/jetstream_bridge/core/connection_factory.rb,
lib/jetstream_bridge/config_helpers/lifecycle.rb,
lib/jetstream_bridge/topology/subject_matcher.rb,
lib/jetstream_bridge/publisher/batch_publisher.rb,
lib/jetstream_bridge/consumer/message_processor.rb,
lib/jetstream_bridge/core/consumer_mode_resolver.rb,
lib/jetstream_bridge/publisher/outbox_repository.rb,
lib/jetstream_bridge/consumer/inbox/inbox_message.rb,
lib/jetstream_bridge/consumer/subscription_manager.rb,
lib/jetstream_bridge/consumer/inbox/inbox_processor.rb,
lib/jetstream_bridge/consumer/inbox/inbox_repository.rb,
lib/jetstream_bridge/test_helpers/integration_helpers.rb,
lib/jetstream_bridge/consumer/pull_subscription_builder.rb,
lib/generators/jetstream_bridge/install/install_generator.rb,
lib/generators/jetstream_bridge/migrations/migrations_generator.rb,
lib/generators/jetstream_bridge/initializer/initializer_generator.rb,
lib/generators/jetstream_bridge/health_check/health_check_generator.rb

Overview

JetstreamBridge

Defined Under Namespace

Modules: ConfigHelpers, ConfigPreset, ConsumerMiddleware, ConsumerModeResolver, Core, DebugHelper, Duration, Generators, Logging, ModelCodecSetup, ModelUtils, Models, Rails, StreamSupport, SubjectMatcher, TestHelpers Classes: BackoffStrategy, BatchPublishError, BatchPublisher, Config, ConfigurationError, Connection, ConnectionError, ConnectionNotEstablishedError, Consumer, ConsumerError, ConsumerProvisioningError, DlqError, DlqPublishFailedError, DlqPublisher, Error, ExponentialBackoffStrategy, HandlerError, HealthCheckFailedError, InboxError, InboxEvent, InboxMessage, InboxProcessor, InboxRepository, InvalidSubjectError, LinearBackoffStrategy, MessageContext, MessageProcessor, MissingConfigurationError, OutboxError, OutboxEvent, OutboxRepository, OverlapGuard, Provisioner, PublishError, PublishFailedError, Publisher, PublisherRetryStrategy, PullSubscriptionBuilder, Railtie, RetryExhausted, RetryStrategy, Stream, StreamCreationFailedError, StreamNotFoundError, SubjectOverlapError, SubscriptionManager, Topology, TopologyError

Constant Summary collapse

VERSION =
'7.1.4'

Class Method Summary collapse

Class Method Details

.configConfig

Returns the current configuration, creating a default instance if needed.

Returns:

  • (Config)

    The current configuration object



74
75
76
# File 'lib/jetstream_bridge.rb', line 74

def config
  @config ||= Config.new
end

.configure(overrides = {}, **extra_overrides) {|Config| ... } ⇒ Config

Configure JetStream Bridge settings

This method sets configuration WITHOUT automatically establishing a connection. Connection must be established explicitly via startup! or will be established automatically on first use (publish/subscribe) or via Rails railtie initialization.

Examples:

Basic configuration

JetstreamBridge.configure do |config|
  config.nats_urls = "nats://localhost:4222"
  config.app_name = "my_app"
  config.destination_app = "worker"
end
JetstreamBridge.startup!  # Explicitly start connection

With hash overrides

JetstreamBridge.configure(app_name: 'my_app')

Parameters:

  • overrides (Hash) (defaults to: {})

    Configuration key-value pairs to set

  • extra_overrides (Hash)

    Additional keyword arguments merged into overrides

Yields:

  • (Config)

    Configuration object for block-based configuration

Returns:

  • (Config)

    The configured instance



99
100
101
102
103
104
105
106
107
108
# File 'lib/jetstream_bridge.rb', line 99

def configure(overrides = {}, **extra_overrides)
  # Merge extra keyword arguments into overrides hash
  all_overrides = overrides.nil? ? extra_overrides : overrides.merge(extra_overrides)

  cfg = config
  all_overrides.each { |k, v| assign_config_option!(cfg, k, v) } unless all_overrides.empty?
  yield(cfg) if block_given?

  cfg
end

.configure_for(preset) {|Config| ... } ⇒ Config

Configure with a preset

This method applies a configuration preset. Connection must be established separately via startup! or via Rails railtie.

Examples:

JetstreamBridge.configure_for(:production) do |config|
  config.nats_urls = ENV["NATS_URLS"]
  config.app_name = "my_app"
  config.destination_app = "worker"
end
JetstreamBridge.startup!  # Explicitly start connection

Parameters:

  • preset (Symbol)

    Preset name (:development, :test, :production, etc.)

Yields:

  • (Config)

    Configuration object

Returns:

  • (Config)

    Configured instance



126
127
128
129
130
131
# File 'lib/jetstream_bridge.rb', line 126

def configure_for(preset)
  configure do |cfg|
    cfg.apply_preset(preset)
    yield(cfg) if block_given?
  end
end

.connect_and_provision!Object

Establishes a connection and provisions stream topology.

Returns:

  • (Object)

    JetStream context

Raises:



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/jetstream_bridge.rb', line 225

def connect_and_provision!
  config.validate!
  provision = config.auto_provision
  Connection.connect!(verify_js: provision)
  jts = Connection.jetstream
  raise ConnectionNotEstablishedError, 'JetStream connection not available' unless jts

  if provision
    Provisioner.new(config: config).provision_stream!(jts: jts)
  else
    Logging.info(
      'auto_provision=false: skipping stream provisioning and JetStream account_info. ' \
      'Run `bundle exec rake jetstream_bridge:provision` with admin credentials to create/update topology.',
      tag: 'JetstreamBridge'
    )
  end

  jts
end

.connected?Boolean

Check if connected to NATS

Returns:

  • (Boolean)

    true if connected and healthy



302
303
304
305
306
# File 'lib/jetstream_bridge.rb', line 302

def connected?
  Connection.instance.connected?
rescue StandardError
  false
end

.health_check(skip_cache: false) ⇒ Hash

Active health check for monitoring and readiness probes

Performs actual operations to verify system health:

  • Checks NATS connection (active: calls account_info API)

  • Verifies stream exists and is accessible (active: queries stream info)

  • Tests NATS round-trip communication (active: RTT measurement)

Rate Limiting: To prevent abuse, uncached health checks are limited to once every 5 seconds. Cached results (within 30s TTL) bypass this limit via Connection.instance.connected?.

Parameters:

  • skip_cache (Boolean) (defaults to: false)

    Force fresh health check, bypass connection cache (rate limited)

Returns:

  • (Hash)

    Health status including NATS connection, stream, and version

Raises:



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/jetstream_bridge.rb', line 267

def health_check(skip_cache: false)
  # Rate limit uncached requests to prevent abuse (max 1 per 5 seconds)
  enforce_health_check_rate_limit! if skip_cache

  start_time = Time.now
  conn_status = connection_snapshot(Connection.instance, skip_cache: skip_cache)
  stream_info = stream_status(conn_status[:connected])
  rtt_ms = measure_nats_rtt if conn_status[:connected]
  health_check_duration_ms = elapsed_ms(start_time)

  {
    healthy: health_flag(conn_status[:connected], stream_info),
    connection: connection_payload(conn_status),
    stream: stream_info,
    performance: {
      nats_rtt_ms: rtt_ms,
      health_check_duration_ms: health_check_duration_ms
    },
    config: config_summary,
    version: JetstreamBridge::VERSION
  }
rescue StandardError => e
  {
    healthy: false,
    connection: {
      state: :failed,
      connected: false
    },
    error: "#{e.class}: #{e.message}"
  }
end

.provision!(provision_consumer: true) ⇒ Object

Provision stream/consumer using management credentials (out of band from runtime).

Parameters:

  • provision_consumer (Boolean) (defaults to: true)

    Whether to create/align the consumer along with the stream.

Returns:

  • (Object)

    JetStream context



249
250
251
252
# File 'lib/jetstream_bridge.rb', line 249

def provision!(provision_consumer: true)
  config.validate!
  Provisioner.new(config: config).provision!(provision_consumer: provision_consumer)
end

.publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil) ⇒ Models::PublishResult

Convenience method to publish events

Automatically establishes connection on first use if not already connected.

Supports three usage patterns:

  1. Structured parameters (recommended): JetstreamBridge.publish(resource_type: ‘user’, event_type: ‘created’, payload: { id: 1, name: ‘Ada’ })

  2. Simplified hash (infers resource_type from event_type): JetstreamBridge.publish(event_type: ‘user.created’, payload: { id: 1, name: ‘Ada’ })

  3. Complete envelope (advanced): JetstreamBridge.publish({ event_type: ‘created’, resource_type: ‘user’, payload: …, event_id: ‘…’ })

Examples:

Check result status

result = JetstreamBridge.publish(event_type: "user.created", payload: { id: 1 })
if result.success?
  puts "Published event #{result.event_id}"
else
  logger.error("Publish failed: #{result.error}")
end

Parameters:

  • event_or_hash (Hash, nil) (defaults to: nil)

    Event hash or first positional argument

  • resource_type (String, nil) (defaults to: nil)

    Resource type (e.g., ‘user’, ‘order’)

  • event_type (String, nil) (defaults to: nil)

    Event type (e.g., ‘created’, ‘updated’, ‘user.created’)

  • payload (Hash, nil) (defaults to: nil)

    Event payload data

  • subject (String, nil) (defaults to: nil)

    Optional subject override

  • kwargs (Hash)

    Additional keyword arguments forwarded to Publisher#publish (event_id, occurred_at, trace_id)

Returns:



346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
# File 'lib/jetstream_bridge.rb', line 346

def publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil, **)
  retried_after_reconnect = false
  begin
    connect_if_needed!
    publisher = Publisher.new
    publisher.publish(event_or_hash, resource_type: resource_type, event_type: event_type, payload: payload,
                                     subject: subject, **)
  rescue ConnectionNotEstablishedError => e
    raise if retried_after_reconnect

    retried_after_reconnect = true
    Logging.warn("JetStream context unavailable during publish: #{e.message}. Reconnecting and retrying once.",
                 tag: 'JetstreamBridge')
    reconnect!
    retry
  end
end

.publish!Models::PublishResult

Publish variant that raises on error

Examples:

JetstreamBridge.publish!(event_type: "user.created", payload: { id: 1 })
# Raises PublishError if publishing fails

Returns:

Raises:



373
374
375
376
377
378
379
380
381
# File 'lib/jetstream_bridge.rb', line 373

def publish!(...)
  result = publish(...)
  if result.failure?
    raise PublishError.new(result.error&.message, event_id: result.event_id,
                                                  subject: result.subject)
  end

  result
end

.publish_batch {|BatchPublisher| ... } ⇒ BatchPublisher::BatchResult

Batch publish multiple events efficiently

Examples:

results = JetstreamBridge.publish_batch do |batch|
  users.each do |user|
    batch.add(event_type: "user.created", payload: { id: user.id })
  end
end
puts "Success: #{results.successful_count}, Failed: #{results.failed_count}"

Yields:

Returns:



395
396
397
398
399
# File 'lib/jetstream_bridge.rb', line 395

def publish_batch
  batch = BatchPublisher.new
  yield(batch) if block_given?
  batch.publish
end

.reconnect!void

This method returns an undefined value.

Reconnect to NATS

Closes existing connection and establishes a new one. Useful for:

  • Forking web servers (Puma, Unicorn) after worker boot

  • Recovering from connection issues

  • Configuration changes that require reconnection

Examples:

In Puma configuration (config/puma.rb)

on_worker_boot do
  JetstreamBridge.reconnect! if defined?(JetstreamBridge)
end

Raises:



175
176
177
178
179
# File 'lib/jetstream_bridge.rb', line 175

def reconnect!
  Logging.info('Reconnecting to NATS...', tag: 'JetstreamBridge')
  shutdown! if @connection_initialized
  startup!
end

.reset!void

This method returns an undefined value.

Reset all configuration and connection state.

Clears the config singleton and marks the connection as uninitialized. Primarily used in tests to restore a clean slate between examples.



139
140
141
142
143
# File 'lib/jetstream_bridge.rb', line 139

def reset!
  @config = nil
  @connection_initialized = false
  Consumer.reset_signal_handlers! if defined?(Consumer)
end

.shutdown!void

This method returns an undefined value.

Gracefully shutdown the JetStream Bridge connection

Closes the NATS connection and cleans up resources. Should be called during application shutdown (e.g., in at_exit or signal handlers).



187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/jetstream_bridge.rb', line 187

def shutdown!
  return unless @connection_initialized

  begin
    nc = Connection.nc
    nc&.close if nc&.connected?
    Logging.info('JetStream Bridge shut down gracefully', tag: 'JetstreamBridge')
  rescue StandardError => e
    Logging.error("Error during shutdown: #{e.message}", tag: 'JetstreamBridge')
  ensure
    @connection_initialized = false
  end
end

.startup!void

This method returns an undefined value.

Initialize the JetStream Bridge connection and topology

This method can be called explicitly if needed. It’s idempotent and safe to call multiple times.

Raises:



152
153
154
155
156
157
158
159
# File 'lib/jetstream_bridge.rb', line 152

def startup!
  return if @connection_initialized

  config.validate!
  connect_and_provision!
  @connection_initialized = true
  Logging.info('JetStream Bridge started successfully', tag: 'JetstreamBridge')
end

.stream_infoHash

Get stream information for the configured stream

Returns:

  • (Hash)

    Stream information including subjects and message count



311
312
313
# File 'lib/jetstream_bridge.rb', line 311

def stream_info
  fetch_stream_info
end

.subscribe(handler = nil, run: false, durable_name: nil, batch_size: nil) {|event| ... } ⇒ Consumer, Thread

Convenience method to start consuming messages

Automatically establishes connection on first use if not already connected.

Supports two usage patterns:

  1. With a block (recommended): consumer = JetstreamBridge.subscribe do |event|

    puts "Received: #{event.type} on #{event.subject} (attempt #{event.deliveries})"
    

    end consumer.run!

  2. With auto-run (returns Thread): thread = JetstreamBridge.subscribe(run: true) do |event|

    puts "Received: #{event.type}"
    

    end thread.join # Wait for consumer to finish

  3. With a handler object: handler = ->(event) { puts event.type } consumer = JetstreamBridge.subscribe(handler) consumer.run!

Parameters:

  • handler (Proc, #call, nil) (defaults to: nil)

    Message handler (optional if block given)

  • run (Boolean) (defaults to: false)

    If true, automatically runs consumer in a background thread

  • durable_name (String, nil) (defaults to: nil)

    Optional durable consumer name override

  • batch_size (Integer, nil) (defaults to: nil)

    Optional batch size override

Yields:

  • (event)

    Yields Models::Event object to block

Returns:

  • (Consumer, Thread)

    Consumer instance or Thread if run: true

Raises:

  • (ArgumentError)


430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
# File 'lib/jetstream_bridge.rb', line 430

def subscribe(handler = nil, run: false, durable_name: nil, batch_size: nil, &block)
  connect_if_needed!
  handler ||= block
  raise ArgumentError, 'Handler or block required' unless handler

  consumer = Consumer.new(handler, durable_name: durable_name, batch_size: batch_size)

  if run
    thread = Thread.new { consumer.run! }
    thread.abort_on_exception = true
    thread
  else
    consumer
  end
end

.use_dlq?Boolean

Whether the dead letter queue is enabled.

Returns:

  • (Boolean)


218
219
220
# File 'lib/jetstream_bridge.rb', line 218

def use_dlq?
  config.use_dlq
end

.use_inbox?Boolean

Whether the idempotent inbox pattern is enabled.

Returns:

  • (Boolean)


211
212
213
# File 'lib/jetstream_bridge.rb', line 211

def use_inbox?
  config.use_inbox
end

.use_outbox?Boolean

Whether the transactional outbox pattern is enabled.

Returns:

  • (Boolean)


204
205
206
# File 'lib/jetstream_bridge.rb', line 204

def use_outbox?
  config.use_outbox
end