Method: NATS.connect
- Defined in:
- lib/nats/client.rb
.connect(uri = nil, opts = {}, &blk) ⇒ NATS
Create and return a connection to the server with the given options. The optional block will be called when the connection has been completed.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/nats/client.rb', line 151 def connect(uri=nil, opts={}, &blk) case uri when String # Initialize TLS defaults in case any url is using it. uris = opts[:uri] = process_uri(uri) opts[:tls] ||= {} if uris.any? {|u| u.scheme == 'tls'} when Hash opts = uri end # Defaults opts[:verbose] = false if opts[:verbose].nil? opts[:pedantic] = false if opts[:pedantic].nil? opts[:reconnect] = true if opts[:reconnect].nil? opts[:ssl] = false if opts[:ssl].nil? opts[:max_reconnect_attempts] = MAX_RECONNECT_ATTEMPTS if opts[:max_reconnect_attempts].nil? opts[:reconnect_time_wait] = RECONNECT_TIME_WAIT if opts[:reconnect_time_wait].nil? opts[:ping_interval] = DEFAULT_PING_INTERVAL if opts[:ping_interval].nil? opts[:max_outstanding_pings] = DEFAULT_PING_MAX if opts[:max_outstanding_pings].nil? opts[:drain_timeout] = DEFAULT_DRAIN_TIMEOUT if opts[:drain_timeout].nil? # Override with ENV opts[:uri] ||= ENV['NATS_URI'] || DEFAULT_URI opts[:verbose] = ENV['NATS_VERBOSE'].downcase == 'true' unless ENV['NATS_VERBOSE'].nil? opts[:pedantic] = ENV['NATS_PEDANTIC'].downcase == 'true' unless ENV['NATS_PEDANTIC'].nil? opts[:debug] = ENV['NATS_DEBUG'].downcase == 'true' unless ENV['NATS_DEBUG'].nil? opts[:reconnect] = ENV['NATS_RECONNECT'].downcase == 'true' unless ENV['NATS_RECONNECT'].nil? opts[:fast_producer_error] = ENV['NATS_FAST_PRODUCER'].downcase == 'true' unless ENV['NATS_FAST_PRODUCER'].nil? opts[:ssl] = ENV['NATS_SSL'].downcase == 'true' unless ENV['NATS_SSL'].nil? opts[:max_reconnect_attempts] = ENV['NATS_MAX_RECONNECT_ATTEMPTS'].to_i unless ENV['NATS_MAX_RECONNECT_ATTEMPTS'].nil? opts[:reconnect_time_wait] = ENV['NATS_RECONNECT_TIME_WAIT'].to_i unless ENV['NATS_RECONNECT_TIME_WAIT'].nil? opts[:name] ||= ENV['NATS_CONNECTION_NAME'] opts[:no_echo] ||= ENV['NATS_NO_ECHO'] || false opts[:ping_interval] = ENV['NATS_PING_INTERVAL'].to_i unless ENV['NATS_PING_INTERVAL'].nil? opts[:max_outstanding_pings] = ENV['NATS_MAX_OUTSTANDING_PINGS'].to_i unless ENV['NATS_MAX_OUTSTANDING_PINGS'].nil? opts[:drain_timeout] ||= ENV['NATS_DRAIN_TIMEOUT'].to_i unless ENV['NATS_DRAIN_TIMEOUT'].nil? uri = opts[:uris] || opts[:servers] || opts[:uri] if opts[:tls] case when opts[:tls][:ca_file] # Ensure that the file exists before going further # in order to report configuration errors during # connect synchronously. if !File.readable?(opts[:tls][:ca_file]) raise(Error, "TLS Verification is enabled but ca_file %s is not readable" % opts[:tls][:ca_file]) end # Certificate is supplied so assume we mean verification by default, # but still allow disabling explicitly by setting to false. opts[:tls][:verify_peer] ||= true when (opts[:tls][:verify_peer] && !opts[:tls][:ca_file]) raise(Error, "TLS Verification is enabled but ca_file is not set") else # Otherwise, disable verifying peer by default, # thus never reaching EM#ssl_verify_peer opts[:tls][:verify_peer] = false end # Allow overriding directly but default to those which server supports. opts[:tls][:ssl_version] ||= %w(tlsv1 tlsv1_1 tlsv1_2) opts[:tls][:protocols] ||= %w(tlsv1 tlsv1_1 tlsv1_2) end # If they pass an array here just pass along to the real connection, and use first as the first attempt.. # Real connection will do proper walk throughs etc.. unless uri.nil? uris = uri.kind_of?(Array) ? uri : [uri] uris.shuffle! unless opts[:dont_randomize_servers] u = uris.first @uri = u.is_a?(URI) ? u.dup : URI.parse(u) end @err_cb = proc { |e| raise e } unless err_cb @close_cb = proc { } unless close_cb @disconnect_cb = proc { } unless disconnect_cb client = EM.connect(@uri.host, @uri.port, self, opts) client.on_connect(&blk) if blk return client end |