Class: JetstreamBridge::Connection
- Inherits:
-
Object
- Object
- JetstreamBridge::Connection
- Includes:
- Singleton
- Defined in:
- lib/jetstream_bridge/core/connection.rb
Overview
Singleton connection to NATS with thread-safe initialization.
This class manages a single NATS connection for the entire application, ensuring thread-safe access in multi-threaded environments like Rails with Puma or Sidekiq.
Thread Safety:
-
Connection initialization is synchronized with a mutex
-
The singleton pattern ensures only one connection instance exists
-
Safe to call from multiple threads/workers simultaneously
Example:
# Safe from any thread
jts = JetstreamBridge::Connection.connect!
jts.publish(...)
Defined Under Namespace
Modules: State
Constant Summary collapse
- DEFAULT_CONN_OPTS =
{ reconnect: true, reconnect_time_wait: 2, max_reconnect_attempts: 10, connect_timeout: 5 }.freeze
- VALID_NATS_SCHEMES =
%w[nats nats+tls].freeze
- @@connection_lock =
Class-level mutex for thread-safe connection initialization Using class variable to avoid race condition in mutex creation rubocop:disable Style/ClassVars
Mutex.new
Instance Attribute Summary collapse
-
#connected_at ⇒ Time?
readonly
Public API for getting connection timestamp.
-
#last_reconnect_error ⇒ Object
readonly
Last reconnection error metadata (exposed for health checks/diagnostics).
-
#last_reconnect_error_at ⇒ Object
readonly
Last reconnection error metadata (exposed for health checks/diagnostics).
Class Method Summary collapse
-
.connect! ⇒ NATS::JetStream::JS
Thread-safe delegator to the singleton instance.
- .jetstream ⇒ Object
-
.nc ⇒ Object
Optional accessors if callers need raw handles.
Instance Method Summary collapse
-
#connect! ⇒ Object
Idempotent: returns an existing, healthy JetStream context or establishes one.
-
#connected?(skip_cache: false) ⇒ Boolean
Public API for checking connection status.
-
#state ⇒ Symbol
Get current connection state.
Instance Attribute Details
#connected_at ⇒ Time? (readonly)
Public API for getting connection timestamp
136 137 138 |
# File 'lib/jetstream_bridge/core/connection.rb', line 136 def connected_at @connected_at end |
#last_reconnect_error ⇒ Object (readonly)
Last reconnection error metadata (exposed for health checks/diagnostics)
139 140 141 |
# File 'lib/jetstream_bridge/core/connection.rb', line 139 def last_reconnect_error @last_reconnect_error end |
#last_reconnect_error_at ⇒ Object (readonly)
Last reconnection error metadata (exposed for health checks/diagnostics)
139 140 141 |
# File 'lib/jetstream_bridge/core/connection.rb', line 139 def last_reconnect_error_at @last_reconnect_error_at end |
Class Method Details
.connect! ⇒ NATS::JetStream::JS
Thread-safe delegator to the singleton instance. Returns a live JetStream context.
Safe to call from multiple threads - uses class-level mutex for synchronization.
61 62 63 |
# File 'lib/jetstream_bridge/core/connection.rb', line 61 def connect! @@connection_lock.synchronize { instance.connect! } end |
.jetstream ⇒ Object
70 71 72 |
# File 'lib/jetstream_bridge/core/connection.rb', line 70 def jetstream instance.__send__(:jetstream) end |
.nc ⇒ Object
Optional accessors if callers need raw handles
66 67 68 |
# File 'lib/jetstream_bridge/core/connection.rb', line 66 def nc instance.__send__(:nc) end |
Instance Method Details
#connect! ⇒ Object
Idempotent: returns an existing, healthy JetStream context or establishes one.
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/jetstream_bridge/core/connection.rb', line 76 def connect! # Check if already connected without acquiring mutex (for performance) return @jts if @jts && @nc&.connected? servers = nats_servers raise 'No NATS URLs configured' if servers.empty? @state = State::CONNECTING establish_connection_with_retry(servers) Logging.info( "Connected to NATS (#{servers.size} server#{'s' unless servers.size == 1}): " \ "#{sanitize_urls(servers).join(', ')}", tag: 'JetstreamBridge::Connection' ) # Ensure topology (streams, subjects, overlap guard, etc.) Topology.ensure!(@jts) @connected_at = Time.now.utc @state = State::CONNECTED @jts rescue StandardError @state = State::FAILED cleanup_connection! raise end |
#connected?(skip_cache: false) ⇒ Boolean
Public API for checking connection status
Uses cached health check result to avoid excessive network calls. Cache expires after 30 seconds.
Thread-safe: Cache updates are synchronized to prevent race conditions.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/jetstream_bridge/core/connection.rb', line 113 def connected?(skip_cache: false) return false unless @nc&.connected? return false unless @jts # Use cached result if available and fresh now = Time.now.to_i return @cached_health_status if !skip_cache && @last_health_check && (now - @last_health_check) < 30 # Thread-safe cache update to prevent race conditions @@connection_lock.synchronize do # Double-check after acquiring lock (another thread may have updated) now = Time.now.to_i return @cached_health_status if !skip_cache && @last_health_check && (now - @last_health_check) < 30 # Perform actual health check @cached_health_status = jetstream_healthy? @last_health_check = now @cached_health_status end end |
#state ⇒ Symbol
Get current connection state
144 145 146 147 148 149 150 |
# File 'lib/jetstream_bridge/core/connection.rb', line 144 def state return State::DISCONNECTED unless @nc return State::FAILED if @last_reconnect_error && !@nc.connected? return State::RECONNECTING if @reconnecting @nc.connected? ? (@state || State::CONNECTED) : State::DISCONNECTED end |