Class: JetstreamBridge::Connection

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/jetstream_bridge/core/connection.rb

Overview

Singleton connection to NATS with thread-safe initialization.

This class manages a single NATS connection for the entire application, ensuring thread-safe access in multi-threaded environments like Rails with Puma or Sidekiq.

Thread Safety:

  • Connection initialization is synchronized with a mutex

  • The singleton pattern ensures only one connection instance exists

  • Safe to call from multiple threads/workers simultaneously

Example:

# Safe from any thread
jts = JetstreamBridge::Connection.connect!
jts.publish(...)

Constant Summary collapse

DEFAULT_CONN_OPTS =
{
  reconnect: true,
  reconnect_time_wait: 2,
  max_reconnect_attempts: 10,
  connect_timeout: 5
}.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#connected_atTime? (readonly)

Public API for getting connection timestamp

Returns:

  • (Time, nil)

    timestamp when connection was established



89
90
91
# File 'lib/jetstream_bridge/core/connection.rb', line 89

def connected_at
  @connected_at
end

Class Method Details

.connect!NATS::JetStream::JS

Thread-safe delegator to the singleton instance. Returns a live JetStream context.

Safe to call from multiple threads - uses mutex for synchronization.

Returns:

  • (NATS::JetStream::JS)

    JetStream context



44
45
46
47
# File 'lib/jetstream_bridge/core/connection.rb', line 44

def connect!
  @__mutex ||= Mutex.new
  @__mutex.synchronize { instance.connect! }
end

.jetstreamObject



54
55
56
# File 'lib/jetstream_bridge/core/connection.rb', line 54

def jetstream
  instance.__send__(:jetstream)
end

.ncObject

Optional accessors if callers need raw handles



50
51
52
# File 'lib/jetstream_bridge/core/connection.rb', line 50

def nc
  instance.__send__(:nc)
end

Instance Method Details

#connect!Object

Idempotent: returns an existing, healthy JetStream context or establishes one.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/jetstream_bridge/core/connection.rb', line 60

def connect!
  return @jts if connected?

  servers = nats_servers
  raise 'No NATS URLs configured' if servers.empty?

  establish_connection(servers)

  Logging.info(
    "Connected to NATS (#{servers.size} server#{'s' unless servers.size == 1}): " \
    "#{sanitize_urls(servers).join(', ')}",
    tag: 'JetstreamBridge::Connection'
  )

  # Ensure topology (streams, subjects, overlap guard, etc.)
  Topology.ensure!(@jts)

  @connected_at = Time.now.utc
  @jts
end

#connected?Boolean

Public API for checking connection status

Returns:

  • (Boolean)

    true if NATS client is connected and JetStream is healthy



83
84
85
# File 'lib/jetstream_bridge/core/connection.rb', line 83

def connected?
  @nc&.connected? && @jts && jetstream_healthy?
end