Class: JetstreamBridge::Connection

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/jetstream_bridge/core/connection.rb

Overview

Singleton connection to NATS with thread-safe initialization.

This class manages a single NATS connection for the entire application, ensuring thread-safe access in multi-threaded environments like Rails with Puma or Sidekiq.

Thread Safety:

  • Connection initialization is synchronized with a mutex

  • The singleton pattern ensures only one connection instance exists

  • Safe to call from multiple threads/workers simultaneously

Example:

# Safe from any thread
jts = JetstreamBridge::Connection.connect!
jts.publish(...)

Defined Under Namespace

Modules: State

Constant Summary collapse

DEFAULT_CONN_OPTS =
{
  reconnect: true,
  reconnect_time_wait: 2,
  max_reconnect_attempts: 10,
  connect_timeout: 5
}.freeze
VALID_NATS_SCHEMES =
%w[nats nats+tls].freeze
@@connection_lock =

Class-level mutex for thread-safe connection initialization Using class variable to avoid race condition in mutex creation rubocop:disable Style/ClassVars

Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#connected_atTime? (readonly)

Public API for getting connection timestamp



136
137
138
# File 'lib/jetstream_bridge/core/connection.rb', line 136

def connected_at
  @connected_at
end

#last_reconnect_errorObject (readonly)

Last reconnection error metadata (exposed for health checks/diagnostics)



139
140
141
# File 'lib/jetstream_bridge/core/connection.rb', line 139

def last_reconnect_error
  @last_reconnect_error
end

#last_reconnect_error_atObject (readonly)

Last reconnection error metadata (exposed for health checks/diagnostics)



139
140
141
# File 'lib/jetstream_bridge/core/connection.rb', line 139

def last_reconnect_error_at
  @last_reconnect_error_at
end

Class Method Details

.connect!NATS::JetStream::JS

Thread-safe delegator to the singleton instance. Returns a live JetStream context.

Safe to call from multiple threads - uses class-level mutex for synchronization.



61
62
63
# File 'lib/jetstream_bridge/core/connection.rb', line 61

def connect!
  @@connection_lock.synchronize { instance.connect! }
end

.jetstreamObject



70
71
72
# File 'lib/jetstream_bridge/core/connection.rb', line 70

def jetstream
  instance.__send__(:jetstream)
end

.ncObject

Optional accessors if callers need raw handles



66
67
68
# File 'lib/jetstream_bridge/core/connection.rb', line 66

def nc
  instance.__send__(:nc)
end

Instance Method Details

#connect!Object

Idempotent: returns an existing, healthy JetStream context or establishes one.



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/jetstream_bridge/core/connection.rb', line 76

def connect!
  # Check if already connected without acquiring mutex (for performance)
  return @jts if @jts && @nc&.connected?

  servers = nats_servers
  raise 'No NATS URLs configured' if servers.empty?

  @state = State::CONNECTING
  establish_connection_with_retry(servers)

  Logging.info(
    "Connected to NATS (#{servers.size} server#{'s' unless servers.size == 1}): " \
    "#{sanitize_urls(servers).join(', ')}",
    tag: 'JetstreamBridge::Connection'
  )

  # Ensure topology (streams, subjects, overlap guard, etc.)
  Topology.ensure!(@jts)

  @connected_at = Time.now.utc
  @state = State::CONNECTED
  @jts
rescue StandardError
  @state = State::FAILED
  cleanup_connection!
  raise
end

#connected?(skip_cache: false) ⇒ Boolean

Public API for checking connection status

Uses cached health check result to avoid excessive network calls. Cache expires after 30 seconds.

Thread-safe: Cache updates are synchronized to prevent race conditions.



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/jetstream_bridge/core/connection.rb', line 113

def connected?(skip_cache: false)
  return false unless @nc&.connected?
  return false unless @jts

  # Use cached result if available and fresh
  now = Time.now.to_i
  return @cached_health_status if !skip_cache && @last_health_check && (now - @last_health_check) < 30

  # Thread-safe cache update to prevent race conditions
  @@connection_lock.synchronize do
    # Double-check after acquiring lock (another thread may have updated)
    now = Time.now.to_i
    return @cached_health_status if !skip_cache && @last_health_check && (now - @last_health_check) < 30

    # Perform actual health check
    @cached_health_status = jetstream_healthy?
    @last_health_check = now
    @cached_health_status
  end
end

#stateSymbol

Get current connection state



144
145
146
147
148
149
150
# File 'lib/jetstream_bridge/core/connection.rb', line 144

def state
  return State::DISCONNECTED unless @nc
  return State::FAILED if @last_reconnect_error && !@nc.connected?
  return State::RECONNECTING if @reconnecting

  @nc.connected? ? (@state || State::CONNECTED) : State::DISCONNECTED
end