Class: JetstreamBridge::Connection

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/jetstream_bridge/core/connection.rb

Overview

Singleton connection to NATS with thread-safe initialization.

This class manages a single NATS connection for the entire application, ensuring thread-safe access in multi-threaded environments like Rails with Puma or Sidekiq.

Thread Safety:

  • Connection initialization is synchronized with a mutex

  • The singleton pattern ensures only one connection instance exists

  • Safe to call from multiple threads/workers simultaneously

Example:

# Safe from any thread
jts = JetstreamBridge::Connection.connect!
jts.publish(...)

Defined Under Namespace

Modules: State

Constant Summary collapse

DEFAULT_CONN_OPTS =
{
  reconnect: true,
  reconnect_time_wait: 2,
  max_reconnect_attempts: 10,
  connect_timeout: 5
}.freeze
VALID_NATS_SCHEMES =
%w[nats nats+tls].freeze
REFRESH_RETRY_BASE_DELAY =
0.01
REFRESH_RETRY_MAX_DELAY =
30.0
REFRESH_RETRY_MAX_ATTEMPTS =
30
@@connection_lock =

Class-level mutex for thread-safe connection initialization Using class variable to avoid race condition in mutex creation rubocop:disable Style/ClassVars

Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#connected_atTime? (readonly)

Public API for getting connection timestamp



148
149
150
# File 'lib/jetstream_bridge/core/connection.rb', line 148

def connected_at
  @connected_at
end

#last_reconnect_errorObject (readonly)

Last reconnection error metadata (exposed for health checks/diagnostics)



151
152
153
# File 'lib/jetstream_bridge/core/connection.rb', line 151

def last_reconnect_error
  @last_reconnect_error
end

#last_reconnect_error_atObject (readonly)

Last reconnection error metadata (exposed for health checks/diagnostics)



151
152
153
# File 'lib/jetstream_bridge/core/connection.rb', line 151

def last_reconnect_error_at
  @last_reconnect_error_at
end

Class Method Details

.connect!(verify_js: nil) ⇒ NATS::JetStream

Thread-safe delegator to the singleton instance. Returns a live JetStream context.

Safe to call from multiple threads - uses class-level mutex for synchronization.



64
65
66
# File 'lib/jetstream_bridge/core/connection.rb', line 64

def connect!(verify_js: nil)
  @@connection_lock.synchronize { instance.connect!(verify_js: verify_js) }
end

.jetstreamNATS::JetStream?

Returns the JetStream context from the singleton instance.

Raises:



79
80
81
# File 'lib/jetstream_bridge/core/connection.rb', line 79

def jetstream
  instance.__send__(:jetstream)
end

.ncNATS::IO::Client?

Returns the raw NATS client from the singleton instance.



71
72
73
# File 'lib/jetstream_bridge/core/connection.rb', line 71

def nc
  instance.__send__(:nc)
end

Instance Method Details

#connect!(verify_js: nil) ⇒ NATS::JetStream

Idempotent: returns an existing, healthy JetStream context or establishes one.

Raises:



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/jetstream_bridge/core/connection.rb', line 90

def connect!(verify_js: nil)
  verify_js = config_auto_provision if verify_js.nil?
  # Check if already connected without acquiring mutex (for performance)
  return @jts if @jts && @nc&.connected?

  servers = nats_servers
  raise 'No NATS URLs configured' if servers.empty?

  @state = State::CONNECTING
  establish_connection_with_retry(servers, verify_js: verify_js)

  Logging.info(
    "Connected to NATS (#{servers.size} server#{'s' unless servers.size == 1}): " \
    "#{sanitize_urls(servers).join(', ')}",
    tag: 'JetstreamBridge::Connection'
  )

  @connected_at = Time.now.utc
  @state = State::CONNECTED
  @jts
rescue StandardError
  @state = State::FAILED
  cleanup_connection!
  raise
end

#connected?(skip_cache: false) ⇒ Boolean

Public API for checking connection status

Uses cached health check result to avoid excessive network calls. Cache expires after 30 seconds.

Thread-safe: Cache updates are synchronized to prevent race conditions.



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/jetstream_bridge/core/connection.rb', line 125

def connected?(skip_cache: false)
  return false unless @nc&.connected?
  return false unless @jts

  # Use cached result if available and fresh
  now = Time.now.to_i
  return @cached_health_status if !skip_cache && @last_health_check && (now - @last_health_check) < 30

  # Thread-safe cache update to prevent race conditions
  @@connection_lock.synchronize do
    # Double-check after acquiring lock (another thread may have updated)
    now = Time.now.to_i
    return @cached_health_status if !skip_cache && @last_health_check && (now - @last_health_check) < 30

    # Perform actual health check (management APIs optional)
    @cached_health_status = jetstream_healthy?(verify_js: config_auto_provision)
    @last_health_check = now
    @cached_health_status
  end
end

#stateSymbol

Get current connection state



156
157
158
159
160
161
162
# File 'lib/jetstream_bridge/core/connection.rb', line 156

def state
  return State::DISCONNECTED unless @nc
  return State::FAILED if @last_reconnect_error && !@nc.connected?
  return State::RECONNECTING if @reconnecting

  @nc.connected? ? (@state || State::CONNECTED) : State::DISCONNECTED
end