Class: JetstreamBridge::Connection
- Inherits:
-
Object
- Object
- JetstreamBridge::Connection
- Includes:
- Singleton
- Defined in:
- lib/jetstream_bridge/core/connection.rb
Overview
Singleton connection to NATS with thread-safe initialization.
This class manages a single NATS connection for the entire application, ensuring thread-safe access in multi-threaded environments like Rails with Puma or Sidekiq.
Thread Safety:
-
Connection initialization is synchronized with a mutex
-
The singleton pattern ensures only one connection instance exists
-
Safe to call from multiple threads/workers simultaneously
Example:
# Safe from any thread
jts = JetstreamBridge::Connection.connect!
jts.publish(...)
Defined Under Namespace
Modules: State
Constant Summary collapse
- DEFAULT_CONN_OPTS =
{ reconnect: true, reconnect_time_wait: 2, max_reconnect_attempts: 10, connect_timeout: 5 }.freeze
- VALID_NATS_SCHEMES =
%w[nats nats+tls].freeze
- REFRESH_RETRY_BASE_DELAY =
0.01
- REFRESH_RETRY_MAX_DELAY =
30.0- REFRESH_RETRY_MAX_ATTEMPTS =
30- @@connection_lock =
Class-level mutex for thread-safe connection initialization Using class variable to avoid race condition in mutex creation rubocop:disable Style/ClassVars
Mutex.new
Instance Attribute Summary collapse
-
#connected_at ⇒ Time?
readonly
Public API for getting connection timestamp.
-
#last_reconnect_error ⇒ Object
readonly
Last reconnection error metadata (exposed for health checks/diagnostics).
-
#last_reconnect_error_at ⇒ Object
readonly
Last reconnection error metadata (exposed for health checks/diagnostics).
Class Method Summary collapse
-
.connect!(verify_js: nil) ⇒ NATS::JetStream
Thread-safe delegator to the singleton instance.
-
.jetstream ⇒ NATS::JetStream?
Returns the JetStream context from the singleton instance.
-
.nc ⇒ NATS::IO::Client?
Returns the raw NATS client from the singleton instance.
Instance Method Summary collapse
-
#connect!(verify_js: nil) ⇒ NATS::JetStream
Idempotent: returns an existing, healthy JetStream context or establishes one.
-
#connected?(skip_cache: false) ⇒ Boolean
Public API for checking connection status.
-
#state ⇒ Symbol
Get current connection state.
Instance Attribute Details
#connected_at ⇒ Time? (readonly)
Public API for getting connection timestamp
148 149 150 |
# File 'lib/jetstream_bridge/core/connection.rb', line 148 def connected_at @connected_at end |
#last_reconnect_error ⇒ Object (readonly)
Last reconnection error metadata (exposed for health checks/diagnostics)
151 152 153 |
# File 'lib/jetstream_bridge/core/connection.rb', line 151 def last_reconnect_error @last_reconnect_error end |
#last_reconnect_error_at ⇒ Object (readonly)
Last reconnection error metadata (exposed for health checks/diagnostics)
151 152 153 |
# File 'lib/jetstream_bridge/core/connection.rb', line 151 def last_reconnect_error_at @last_reconnect_error_at end |
Class Method Details
.connect!(verify_js: nil) ⇒ NATS::JetStream
Thread-safe delegator to the singleton instance. Returns a live JetStream context.
Safe to call from multiple threads - uses class-level mutex for synchronization.
64 65 66 |
# File 'lib/jetstream_bridge/core/connection.rb', line 64 def connect!(verify_js: nil) @@connection_lock.synchronize { instance.connect!(verify_js: verify_js) } end |
.jetstream ⇒ NATS::JetStream?
Returns the JetStream context from the singleton instance.
79 80 81 |
# File 'lib/jetstream_bridge/core/connection.rb', line 79 def jetstream instance.__send__(:jetstream) end |
.nc ⇒ NATS::IO::Client?
Returns the raw NATS client from the singleton instance.
71 72 73 |
# File 'lib/jetstream_bridge/core/connection.rb', line 71 def nc instance.__send__(:nc) end |
Instance Method Details
#connect!(verify_js: nil) ⇒ NATS::JetStream
Idempotent: returns an existing, healthy JetStream context or establishes one.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/jetstream_bridge/core/connection.rb', line 90 def connect!(verify_js: nil) verify_js = config_auto_provision if verify_js.nil? # Check if already connected without acquiring mutex (for performance) return @jts if @jts && @nc&.connected? servers = nats_servers raise 'No NATS URLs configured' if servers.empty? @state = State::CONNECTING establish_connection_with_retry(servers, verify_js: verify_js) Logging.info( "Connected to NATS (#{servers.size} server#{'s' unless servers.size == 1}): " \ "#{sanitize_urls(servers).join(', ')}", tag: 'JetstreamBridge::Connection' ) @connected_at = Time.now.utc @state = State::CONNECTED @jts rescue StandardError @state = State::FAILED cleanup_connection! raise end |
#connected?(skip_cache: false) ⇒ Boolean
Public API for checking connection status
Uses cached health check result to avoid excessive network calls. Cache expires after 30 seconds.
Thread-safe: Cache updates are synchronized to prevent race conditions.
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/jetstream_bridge/core/connection.rb', line 125 def connected?(skip_cache: false) return false unless @nc&.connected? return false unless @jts # Use cached result if available and fresh now = Time.now.to_i return @cached_health_status if !skip_cache && @last_health_check && (now - @last_health_check) < 30 # Thread-safe cache update to prevent race conditions @@connection_lock.synchronize do # Double-check after acquiring lock (another thread may have updated) now = Time.now.to_i return @cached_health_status if !skip_cache && @last_health_check && (now - @last_health_check) < 30 # Perform actual health check (management APIs optional) @cached_health_status = jetstream_healthy?(verify_js: config_auto_provision) @last_health_check = now @cached_health_status end end |
#state ⇒ Symbol
Get current connection state
156 157 158 159 160 161 162 |
# File 'lib/jetstream_bridge/core/connection.rb', line 156 def state return State::DISCONNECTED unless @nc return State::FAILED if @last_reconnect_error && !@nc.connected? return State::RECONNECTING if @reconnecting @nc.connected? ? (@state || State::CONNECTED) : State::DISCONNECTED end |