Class: JetstreamBridge::Connection
- Inherits:
-
Object
- Object
- JetstreamBridge::Connection
- Includes:
- Singleton
- Defined in:
- lib/jetstream_bridge/core/connection.rb
Overview
Singleton connection to NATS with thread-safe initialization.
This class manages a single NATS connection for the entire application, ensuring thread-safe access in multi-threaded environments like Rails with Puma or Sidekiq.
Thread Safety:
-
Connection initialization is synchronized with a mutex
-
The singleton pattern ensures only one connection instance exists
-
Safe to call from multiple threads/workers simultaneously
Example:
# Safe from any thread
jts = JetstreamBridge::Connection.connect!
jts.publish(...)
Constant Summary collapse
- DEFAULT_CONN_OPTS =
{ reconnect: true, reconnect_time_wait: 2, max_reconnect_attempts: 10, connect_timeout: 5 }.freeze
Instance Attribute Summary collapse
-
#connected_at ⇒ Time?
readonly
Public API for getting connection timestamp.
Class Method Summary collapse
-
.connect! ⇒ NATS::JetStream::JS
Thread-safe delegator to the singleton instance.
- .jetstream ⇒ Object
-
.nc ⇒ Object
Optional accessors if callers need raw handles.
Instance Method Summary collapse
-
#connect! ⇒ Object
Idempotent: returns an existing, healthy JetStream context or establishes one.
-
#connected? ⇒ Boolean
Public API for checking connection status.
Instance Attribute Details
#connected_at ⇒ Time? (readonly)
Public API for getting connection timestamp
89 90 91 |
# File 'lib/jetstream_bridge/core/connection.rb', line 89 def connected_at @connected_at end |
Class Method Details
.connect! ⇒ NATS::JetStream::JS
Thread-safe delegator to the singleton instance. Returns a live JetStream context.
Safe to call from multiple threads - uses mutex for synchronization.
44 45 46 47 |
# File 'lib/jetstream_bridge/core/connection.rb', line 44 def connect! @__mutex ||= Mutex.new @__mutex.synchronize { instance.connect! } end |
.jetstream ⇒ Object
54 55 56 |
# File 'lib/jetstream_bridge/core/connection.rb', line 54 def jetstream instance.__send__(:jetstream) end |
.nc ⇒ Object
Optional accessors if callers need raw handles
50 51 52 |
# File 'lib/jetstream_bridge/core/connection.rb', line 50 def nc instance.__send__(:nc) end |
Instance Method Details
#connect! ⇒ Object
Idempotent: returns an existing, healthy JetStream context or establishes one.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/jetstream_bridge/core/connection.rb', line 60 def connect! return @jts if connected? servers = nats_servers raise 'No NATS URLs configured' if servers.empty? establish_connection(servers) Logging.info( "Connected to NATS (#{servers.size} server#{'s' unless servers.size == 1}): " \ "#{sanitize_urls(servers).join(', ')}", tag: 'JetstreamBridge::Connection' ) # Ensure topology (streams, subjects, overlap guard, etc.) Topology.ensure!(@jts) @connected_at = Time.now.utc @jts end |
#connected? ⇒ Boolean
Public API for checking connection status
83 84 85 |
# File 'lib/jetstream_bridge/core/connection.rb', line 83 def connected? @nc&.connected? && @jts && jetstream_healthy? end |