Method: NATS.connect

Defined in:
lib/nats/client.rb

.connect(uri = nil, opts = {}, &blk) ⇒ NATS

Create and return a connection to the server with the given options. The optional block will be called when the connection has been completed.

Examples:

Connect to local NATS server.

NATS.connect do |nc|
  # ...
end

Setting custom server URI to connect.

NATS.connect("nats://localhost:4222") do |nc|
  # ...
end

Setting username and password to authenticate.

NATS.connect("nats://user:password@localhost:4222") do |nc|
  # ...
end

Specifying explicit list of servers via options.

NATS.connect(servers: ["nats://127.0.0.1:4222","nats://127.0.0.1:4223","nats://127.0.0.1:4224"]) do |nc|
  # ...
end

Using comma separated array to define list of servers.

NATS.connect("nats://localhost:4223,nats://localhost:4224") do |nc|
  # ...
end

Only specifying endpoint uses NATS default scheme and port.

NATS.connect("demo.nats.io") do |nc|
  # ...
end

Setting infinite reconnect retries with 2 seconds back off against custom URI.

NATS.connect("demo.nats.io:4222", max_reconnect_attempts: -1, reconnect_time_wait: 2) do |nc|
  # ...
end

Parameters:

  • uri (String) (defaults to: nil)

    The URI or comma separated list of URIs of NATS servers to connect to.

  • opts (Hash) (defaults to: {})
  • &blk (Block)

    called when the connection is completed. Connection will be passed to the block.

Options Hash (opts):

  • :uri (String|URI)

    The URI to connect to, example nats://localhost:4222

  • :reconnect (Boolean)

    Boolean that can be used to suppress reconnect functionality.

  • :debug (Boolean)

    Boolean that can be used to output additional debug information.

  • :verbose (Boolean)

    Boolean that is sent to server for setting verbose protocol mode.

  • :pedantic (Boolean)

    Boolean that is sent to server for setting pedantic mode.

  • :ssl (Boolean)

    Boolean that is sent to server for setting TLS/SSL mode.

  • :tls (Hash)

    Map of options for configuring secure connection handled to EM#start_tls directly.

  • :max_reconnect_attempts (Integer)

    Integer that can be used to set the max number of reconnect tries

  • :reconnect_time_wait (Integer)

    Integer that can be used to set the number of seconds to wait between reconnect tries

  • :ping_interval (Integer)

    Integer that can be used to set the ping interval in seconds.

  • :max_outstanding_pings (Integer)

    Integer that can be used to set the max number of outstanding pings before declaring a connection closed.

Returns:

  • (NATS)

    connection to the server.



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/nats/client.rb', line 151

def connect(uri=nil, opts={}, &blk)
  case uri
  when String
    # Initialize TLS defaults in case any url is using it.
    uris = opts[:uri] = process_uri(uri)
    opts[:tls] ||= {} if uris.any? {|u| u.scheme == 'tls'}
  when Hash
    opts = uri
  end

  # Defaults
  opts[:verbose] = false if opts[:verbose].nil?
  opts[:pedantic] = false if opts[:pedantic].nil?
  opts[:reconnect] = true if opts[:reconnect].nil?
  opts[:ssl] = false if opts[:ssl].nil?
  opts[:max_reconnect_attempts] = MAX_RECONNECT_ATTEMPTS if opts[:max_reconnect_attempts].nil?
  opts[:reconnect_time_wait] = RECONNECT_TIME_WAIT if opts[:reconnect_time_wait].nil?
  opts[:ping_interval] = DEFAULT_PING_INTERVAL if opts[:ping_interval].nil?
  opts[:max_outstanding_pings] = DEFAULT_PING_MAX if opts[:max_outstanding_pings].nil?
  opts[:drain_timeout] = DEFAULT_DRAIN_TIMEOUT if opts[:drain_timeout].nil?

  # Override with ENV
  opts[:uri] ||= ENV['NATS_URI'] || DEFAULT_URI
  opts[:verbose] = ENV['NATS_VERBOSE'].downcase == 'true' unless ENV['NATS_VERBOSE'].nil?
  opts[:pedantic] = ENV['NATS_PEDANTIC'].downcase == 'true' unless ENV['NATS_PEDANTIC'].nil?
  opts[:debug] = ENV['NATS_DEBUG'].downcase == 'true' unless ENV['NATS_DEBUG'].nil?
  opts[:reconnect] = ENV['NATS_RECONNECT'].downcase == 'true' unless ENV['NATS_RECONNECT'].nil?
  opts[:fast_producer_error] = ENV['NATS_FAST_PRODUCER'].downcase == 'true' unless ENV['NATS_FAST_PRODUCER'].nil?
  opts[:ssl] = ENV['NATS_SSL'].downcase == 'true' unless ENV['NATS_SSL'].nil?
  opts[:max_reconnect_attempts] = ENV['NATS_MAX_RECONNECT_ATTEMPTS'].to_i unless ENV['NATS_MAX_RECONNECT_ATTEMPTS'].nil?
  opts[:reconnect_time_wait] = ENV['NATS_RECONNECT_TIME_WAIT'].to_i unless ENV['NATS_RECONNECT_TIME_WAIT'].nil?
  opts[:name] ||= ENV['NATS_CONNECTION_NAME']
  opts[:no_echo] ||= ENV['NATS_NO_ECHO'] || false
  opts[:ping_interval] = ENV['NATS_PING_INTERVAL'].to_i unless ENV['NATS_PING_INTERVAL'].nil?
  opts[:max_outstanding_pings] = ENV['NATS_MAX_OUTSTANDING_PINGS'].to_i unless ENV['NATS_MAX_OUTSTANDING_PINGS'].nil?
  opts[:drain_timeout] ||= ENV['NATS_DRAIN_TIMEOUT'].to_i unless ENV['NATS_DRAIN_TIMEOUT'].nil?

  uri = opts[:uris] || opts[:servers] || opts[:uri]

  if opts[:tls]
    case
    when opts[:tls][:ca_file]
      # Ensure that the file exists before going further
      # in order to report configuration errors during
      # connect synchronously.
      if !File.readable?(opts[:tls][:ca_file])
        raise(Error, "TLS Verification is enabled but ca_file %s is not readable" % opts[:tls][:ca_file])
      end

      # Certificate is supplied so assume we mean verification by default,
      # but still allow disabling explicitly by setting to false.
      opts[:tls][:verify_peer] ||= true
    when (opts[:tls][:verify_peer] && !opts[:tls][:ca_file])
      raise(Error, "TLS Verification is enabled but ca_file is not set")
    else
      # Otherwise, disable verifying peer by default,
      # thus never reaching EM#ssl_verify_peer
      opts[:tls][:verify_peer] = false
    end

    # Allow overriding directly but default to those which server supports.
    opts[:tls][:ssl_version] ||= %w(tlsv1 tlsv1_1 tlsv1_2)
    opts[:tls][:protocols]   ||= %w(tlsv1 tlsv1_1 tlsv1_2)
  end

  # If they pass an array here just pass along to the real connection, and use first as the first attempt..
  # Real connection will do proper walk throughs etc..
  unless uri.nil?
    uris = uri.kind_of?(Array) ? uri : [uri]
    uris.shuffle! unless opts[:dont_randomize_servers]
    u = uris.first
    @uri = u.is_a?(URI) ? u.dup : URI.parse(u)
  end

  @err_cb = proc { |e| raise e } unless err_cb
  @close_cb = proc { } unless close_cb
  @disconnect_cb = proc { } unless disconnect_cb

  client = EM.connect(@uri.host, @uri.port, self, opts)
  client.on_connect(&blk) if blk
  return client
end