Module: NATS
- Defined in:
- lib/nats/version.rb,
lib/nats/nuid.rb,
lib/nats/client.rb more...
Overview
Copyright 2010-2018 The NATS Authors Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Defined Under Namespace
Classes: AuthError, ClientError, ConnectError, Error, MonotonicTime, NUID, ServerError
Constant Summary collapse
- DEFAULT_PORT =
4222
- DEFAULT_URI =
"nats://localhost:#{DEFAULT_PORT}".freeze
- MAX_RECONNECT_ATTEMPTS =
10
- RECONNECT_TIME_WAIT =
2
- MAX_PENDING_SIZE =
32768
- FAST_PRODUCER_THRESHOLD =
Maximum outbound size per client to trigger FP, 20MB
(10*1024*1024)
- DEFAULT_PING_INTERVAL =
Ping intervals
120
- DEFAULT_PING_MAX =
2
- DEFAULT_DRAIN_TIMEOUT =
Drain mode support
30
- MSG =
Protocol
/\AMSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n/i
- OK =
:nodoc:
/\A\+OK\s*\r\n/i
- ERR =
:nodoc:
/\A-ERR\s+('.+')?\r\n/i
- PING =
:nodoc:
/\APING\s*\r\n/i
- PONG =
:nodoc:
/\APONG\s*\r\n/i
- INFO =
:nodoc:
/\AINFO\s+([^\r\n]+)\r\n/i
- UNKNOWN =
:nodoc:
/\A(.*)\r\n/
- CR_LF =
Responses
("\r\n".freeze)
- CR_LF_SIZE =
:nodoc:
(CR_LF.bytesize)
- PING_REQUEST =
:nodoc:
("PING#{CR_LF}".freeze)
- PONG_RESPONSE =
:nodoc:
("PONG#{CR_LF}".freeze)
- SUB_OP =
:nodoc:
('SUB'.freeze)
- EMPTY_MSG =
:nodoc:
(''.freeze)
- SUB =
Used for future pedantic Mode
/^([^\.\*>\s]+|>$|\*)(\.([^\.\*>\s]+|>$|\*))*$/
- SUB_NO_WC =
:nodoc:
/^([^\.\*>\s]+)(\.([^\.\*>\s]+))*$/
- AWAITING_CONTROL_LINE =
Parser
1
- AWAITING_MSG_PAYLOAD =
:nodoc:
2
- AWAITING_INFO_LINE =
:nodoc:
3
- VERSION =
NOTE: These are all announced to the server on CONNECT
"0.11.0".freeze
- LANG =
RUBY_ENGINE
- PROTOCOL_VERSION =
1
Class Attribute Summary collapse
-
.client ⇒ Object
readonly
:nodoc:.
-
.close_cb ⇒ Object
readonly
:nodoc.
-
.disconnect_cb ⇒ Object
readonly
:nodoc.
-
.err_cb ⇒ Object
readonly
:nodoc:.
-
.err_cb_overridden ⇒ Object
readonly
:nodoc:.
-
.reactor_was_running ⇒ Object
(also: reactor_was_running?)
readonly
:nodoc:.
-
.reconnect_cb ⇒ Object
readonly
:nodoc.
Instance Attribute Summary collapse
-
#bytes_received ⇒ Object
readonly
Returns the value of attribute bytes_received.
-
#bytes_sent ⇒ Object
readonly
Returns the value of attribute bytes_sent.
-
#close_cb ⇒ Object
readonly
Returns the value of attribute close_cb.
-
#closing ⇒ Object
(also: #closing?)
readonly
:nodoc.
-
#connect_cb ⇒ Object
readonly
:nodoc:.
-
#connected ⇒ Object
(also: #connected?)
readonly
:nodoc:.
-
#disconnect_cb ⇒ Object
readonly
Returns the value of attribute disconnect_cb.
-
#draining ⇒ Object
(also: #draining?)
readonly
:nodoc.
-
#err_cb ⇒ Object
readonly
:nodoc:.
-
#err_cb_overridden ⇒ Object
readonly
:nodoc:.
-
#msgs_received ⇒ Object
readonly
Returns the value of attribute msgs_received.
-
#msgs_sent ⇒ Object
readonly
Returns the value of attribute msgs_sent.
-
#options ⇒ Object
readonly
:nodoc.
-
#pings ⇒ Object
readonly
Returns the value of attribute pings.
-
#pongs_received ⇒ Object
readonly
:nodoc:.
-
#reconnecting ⇒ Object
(also: #reconnecting?)
readonly
:nodoc.
-
#server_info ⇒ Object
readonly
:nodoc.
-
#server_pool ⇒ Object
readonly
:nodoc.
Class Method Summary collapse
-
.clear_client ⇒ Object
:nodoc:.
-
.connect(uri = nil, opts = {}, &blk) ⇒ NATS
Create and return a connection to the server with the given options.
-
.connected? ⇒ Boolean
Connected state.
-
.connected_server ⇒ URI
Connected server.
-
.create_inbox ⇒ String
Returns a subject that can be used for “directed” communications.
-
.drain(&blk) ⇒ Object
Drain gracefully disconnects from the server, letting subscribers process pending messages already sent by server and optionally calls the associated block.
-
.draining? ⇒ Boolean
Draining state.
-
.flush(*args, &blk) ⇒ Object
Flushes all messages and subscriptions in the default connection.
-
.on_close(&callback) ⇒ Object
Set the default on_closed callback.
-
.on_disconnect(&callback) ⇒ Object
Set the default on_disconnect callback.
-
.on_error(&callback) ⇒ Object
Set the default on_error callback.
-
.on_reconnect(&callback) ⇒ Object
Set the default on_reconnect callback.
-
.options ⇒ Hash
Options.
-
.pending_data_size(*args) ⇒ Object
Return bytes outstanding for the default client connection.
-
.publish(*args, &blk) ⇒ Object
Publish a message using the default client connection.
-
.reconnecting? ⇒ Boolean
Reconnecting state.
-
.request(*args, &blk) ⇒ Object
Publish a message and wait for a response on the default client connection.
-
.server_info ⇒ Hash
Server information.
-
.server_running?(uri) ⇒ Boolean
:nodoc:.
-
.start(*args, &blk) ⇒ Object
Create a default client connection to the server.
-
.stop(&blk) ⇒ Object
Close the default client connection and optionally call the associated block.
-
.subscribe(*args, &blk) ⇒ Object
Subscribe using the default client connection.
-
.timeout(*args, &blk) ⇒ Object
Set a timeout for receiving messages for the subscription.
-
.unsubscribe(*args) ⇒ Object
Cancel a subscription on the default client connection.
-
.wait_for_server(uri, max_wait = 5) ⇒ Object
:nodoc:.
Instance Method Summary collapse
-
#attempt_reconnect ⇒ Object
:nodoc:.
- #auth_connection? ⇒ Boolean
-
#bind_primary ⇒ Object
:nodoc:.
-
#can_reuse_server?(server) ⇒ Boolean
:nodoc:.
- #cancel_ping_timer ⇒ Object
- #cancel_reconnect_timer ⇒ Object
- #client_using_secure_connection? ⇒ Boolean
-
#close ⇒ Object
Close the connection to the server.
-
#connect_command ⇒ Object
:nodoc:.
-
#connected_server ⇒ URI
Connected server.
-
#connection_completed ⇒ Object
:nodoc:.
- #disconnect_error_string ⇒ Object
-
#discovered_servers ⇒ Object
Retrieves the list of servers which have been discovered via server connect_urls announcements.
-
#drain(&blk) ⇒ Object
Drain gracefully closes the connection.
-
#flush(&blk) ⇒ Object
Flushes all messages and subscriptions for the connection.
-
#flush_pending ⇒ Object
:nodoc:.
- #had_error? ⇒ Boolean
- #initialize(options) ⇒ Object
-
#inspect ⇒ Object
:nodoc:.
- #multiple_servers_available? ⇒ Boolean
-
#on_close(&callback) ⇒ Object
Define a callback to be called when client is disconnected from server.
-
#on_connect(&callback) ⇒ Object
Define a callback to be called when the client connection has been established.
-
#on_disconnect(&callback) ⇒ Object
Define a callback to be called when client is disconnected from server.
-
#on_error(&callback) ⇒ Object
Define a callback to be called when errors occur on the client connection.
-
#on_msg(subject, sid, reply, msg) ⇒ Object
:nodoc:.
-
#on_reconnect(&callback) ⇒ Object
Define a callback to be called when a reconnect attempt is made.
-
#pending_data_size ⇒ Object
Return bytes outstanding waiting to be sent to server.
-
#process_connect ⇒ Object
:nodoc:.
-
#process_connect_init(info) ⇒ Object
:nodoc:.
-
#process_disconnect ⇒ Object
:nodoc:.
-
#process_info(info_line) ⇒ Object
:nodoc:.
- #process_pong ⇒ Object
-
#process_uri_options ⇒ Object
Parse out URIs which can now be an array of server choices The server pool will contain both explicit and implicit members.
-
#publish(subject, msg = EMPTY_MSG, opt_reply = nil, &blk) ⇒ Object
Publish a message to a given subject, with optional reply subject and completion block.
-
#queue_server_rt(&cb) ⇒ Object
:nodoc:.
-
#receive_data(data) ⇒ Object
:nodoc:.
-
#request(subject, data = nil, opts = {}, &cb) ⇒ Object
Send a request and have the response delivered to the supplied callback.
-
#schedule_primary_and_connect ⇒ Object
We have failed on an attempt at the primary (first) server, rotate and try again.
-
#schedule_reconnect ⇒ Object
:nodoc:.
-
#send_command(command, priority = false) ⇒ Object
:nodoc:.
-
#send_connect_command ⇒ Object
:nodoc:.
-
#send_ping ⇒ Object
:nodoc:.
- #server_using_secure_connection? ⇒ Boolean
- #setup_nkeys_connect ⇒ Object
- #should_delay_connect?(server) ⇒ Boolean
- #should_not_reconnect? ⇒ Boolean
- #ssl_handshake_completed ⇒ Object
- #ssl_verify_peer(cert) ⇒ Object
- #start_resp_mux_sub! ⇒ Object
-
#subscribe(subject, opts = {}, &callback) ⇒ Object
Subscribe to a subject with optional wildcards.
-
#subscription_count ⇒ Number
Return the active subscription count.
-
#timeout(sid, timeout, opts = {}, &callback) ⇒ Object
Setup a timeout for receiving messages for the subscription.
-
#unbind ⇒ Object
:nodoc:.
-
#unsubscribe(sid, opt_max = nil) ⇒ Object
Cancel a subscription.
-
#user_err_cb? ⇒ Boolean
:nodoc:.
Class Attribute Details
permalink .client ⇒ Object (readonly)
:nodoc:
92 93 94 |
# File 'lib/nats/client.rb', line 92 def client @client end |
permalink .close_cb ⇒ Object (readonly)
:nodoc
93 94 95 |
# File 'lib/nats/client.rb', line 93 def close_cb @close_cb end |
permalink .disconnect_cb ⇒ Object (readonly)
:nodoc
93 94 95 |
# File 'lib/nats/client.rb', line 93 def disconnect_cb @disconnect_cb end |
permalink .err_cb ⇒ Object (readonly)
:nodoc:
92 93 94 |
# File 'lib/nats/client.rb', line 92 def err_cb @err_cb end |
permalink .err_cb_overridden ⇒ Object (readonly)
:nodoc:
92 93 94 |
# File 'lib/nats/client.rb', line 92 def err_cb_overridden @err_cb_overridden end |
permalink .reactor_was_running ⇒ Object (readonly) Also known as: reactor_was_running?
:nodoc:
92 93 94 |
# File 'lib/nats/client.rb', line 92 def reactor_was_running @reactor_was_running end |
permalink .reconnect_cb ⇒ Object (readonly)
:nodoc
93 94 95 |
# File 'lib/nats/client.rb', line 93 def reconnect_cb @reconnect_cb end |
Instance Attribute Details
permalink #bytes_received ⇒ Object (readonly)
Returns the value of attribute bytes_received.
448 449 450 |
# File 'lib/nats/client.rb', line 448 def bytes_received @bytes_received end |
permalink #bytes_sent ⇒ Object (readonly)
Returns the value of attribute bytes_sent.
448 449 450 |
# File 'lib/nats/client.rb', line 448 def bytes_sent @bytes_sent end |
permalink #close_cb ⇒ Object (readonly)
Returns the value of attribute close_cb.
449 450 451 |
# File 'lib/nats/client.rb', line 449 def close_cb @close_cb end |
permalink #closing ⇒ Object (readonly) Also known as: closing?
:nodoc
447 448 449 |
# File 'lib/nats/client.rb', line 447 def closing @closing end |
permalink #connect_cb ⇒ Object (readonly)
:nodoc:
446 447 448 |
# File 'lib/nats/client.rb', line 446 def connect_cb @connect_cb end |
permalink #connected ⇒ Object (readonly) Also known as: connected?
:nodoc:
446 447 448 |
# File 'lib/nats/client.rb', line 446 def connected @connected end |
permalink #disconnect_cb ⇒ Object (readonly)
Returns the value of attribute disconnect_cb.
449 450 451 |
# File 'lib/nats/client.rb', line 449 def disconnect_cb @disconnect_cb end |
permalink #draining ⇒ Object (readonly) Also known as: draining?
:nodoc
447 448 449 |
# File 'lib/nats/client.rb', line 447 def draining @draining end |
permalink #err_cb ⇒ Object (readonly)
:nodoc:
446 447 448 |
# File 'lib/nats/client.rb', line 446 def err_cb @err_cb end |
permalink #err_cb_overridden ⇒ Object (readonly)
:nodoc:
446 447 448 |
# File 'lib/nats/client.rb', line 446 def err_cb_overridden @err_cb_overridden end |
permalink #msgs_received ⇒ Object (readonly)
Returns the value of attribute msgs_received.
448 449 450 |
# File 'lib/nats/client.rb', line 448 def msgs_received @msgs_received end |
permalink #msgs_sent ⇒ Object (readonly)
Returns the value of attribute msgs_sent.
448 449 450 |
# File 'lib/nats/client.rb', line 448 def msgs_sent @msgs_sent end |
permalink #options ⇒ Object (readonly)
:nodoc
447 448 449 |
# File 'lib/nats/client.rb', line 447 def @options end |
permalink #pings ⇒ Object (readonly)
Returns the value of attribute pings.
448 449 450 |
# File 'lib/nats/client.rb', line 448 def pings @pings end |
permalink #pongs_received ⇒ Object (readonly)
:nodoc:
446 447 448 |
# File 'lib/nats/client.rb', line 446 def pongs_received @pongs_received end |
permalink #reconnecting ⇒ Object (readonly) Also known as: reconnecting?
:nodoc
447 448 449 |
# File 'lib/nats/client.rb', line 447 def reconnecting @reconnecting end |
permalink #server_info ⇒ Object (readonly)
:nodoc
447 448 449 |
# File 'lib/nats/client.rb', line 447 def server_info @server_info end |
permalink #server_pool ⇒ Object (readonly)
:nodoc
447 448 449 |
# File 'lib/nats/client.rb', line 447 def server_pool @server_pool end |
Class Method Details
permalink .clear_client ⇒ Object
:nodoc:
403 404 405 |
# File 'lib/nats/client.rb', line 403 def clear_client # :nodoc: @client = nil end |
permalink .connect(uri = nil, opts = {}, &blk) ⇒ NATS
Create and return a connection to the server with the given options. The optional block will be called when the connection has been completed.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/nats/client.rb', line 151 def connect(uri=nil, opts={}, &blk) case uri when String # Initialize TLS defaults in case any url is using it. uris = opts[:uri] = process_uri(uri) opts[:tls] ||= {} if uris.any? {|u| u.scheme == 'tls'} when Hash opts = uri end # Defaults opts[:verbose] = false if opts[:verbose].nil? opts[:pedantic] = false if opts[:pedantic].nil? opts[:reconnect] = true if opts[:reconnect].nil? opts[:ssl] = false if opts[:ssl].nil? opts[:max_reconnect_attempts] = MAX_RECONNECT_ATTEMPTS if opts[:max_reconnect_attempts].nil? opts[:reconnect_time_wait] = RECONNECT_TIME_WAIT if opts[:reconnect_time_wait].nil? opts[:ping_interval] = DEFAULT_PING_INTERVAL if opts[:ping_interval].nil? opts[:max_outstanding_pings] = DEFAULT_PING_MAX if opts[:max_outstanding_pings].nil? opts[:drain_timeout] = DEFAULT_DRAIN_TIMEOUT if opts[:drain_timeout].nil? # Override with ENV opts[:uri] ||= ENV['NATS_URI'] || DEFAULT_URI opts[:verbose] = ENV['NATS_VERBOSE'].downcase == 'true' unless ENV['NATS_VERBOSE'].nil? opts[:pedantic] = ENV['NATS_PEDANTIC'].downcase == 'true' unless ENV['NATS_PEDANTIC'].nil? opts[:debug] = ENV['NATS_DEBUG'].downcase == 'true' unless ENV['NATS_DEBUG'].nil? opts[:reconnect] = ENV['NATS_RECONNECT'].downcase == 'true' unless ENV['NATS_RECONNECT'].nil? opts[:fast_producer_error] = ENV['NATS_FAST_PRODUCER'].downcase == 'true' unless ENV['NATS_FAST_PRODUCER'].nil? opts[:ssl] = ENV['NATS_SSL'].downcase == 'true' unless ENV['NATS_SSL'].nil? opts[:max_reconnect_attempts] = ENV['NATS_MAX_RECONNECT_ATTEMPTS'].to_i unless ENV['NATS_MAX_RECONNECT_ATTEMPTS'].nil? opts[:reconnect_time_wait] = ENV['NATS_RECONNECT_TIME_WAIT'].to_i unless ENV['NATS_RECONNECT_TIME_WAIT'].nil? opts[:name] ||= ENV['NATS_CONNECTION_NAME'] opts[:no_echo] ||= ENV['NATS_NO_ECHO'] || false opts[:ping_interval] = ENV['NATS_PING_INTERVAL'].to_i unless ENV['NATS_PING_INTERVAL'].nil? opts[:max_outstanding_pings] = ENV['NATS_MAX_OUTSTANDING_PINGS'].to_i unless ENV['NATS_MAX_OUTSTANDING_PINGS'].nil? opts[:drain_timeout] ||= ENV['NATS_DRAIN_TIMEOUT'].to_i unless ENV['NATS_DRAIN_TIMEOUT'].nil? uri = opts[:uris] || opts[:servers] || opts[:uri] if opts[:tls] case when opts[:tls][:ca_file] # Ensure that the file exists before going further # in order to report configuration errors during # connect synchronously. if !File.readable?(opts[:tls][:ca_file]) raise(Error, "TLS Verification is enabled but ca_file %s is not readable" % opts[:tls][:ca_file]) end # Certificate is supplied so assume we mean verification by default, # but still allow disabling explicitly by setting to false. opts[:tls][:verify_peer] ||= true when (opts[:tls][:verify_peer] && !opts[:tls][:ca_file]) raise(Error, "TLS Verification is enabled but ca_file is not set") else # Otherwise, disable verifying peer by default, # thus never reaching EM#ssl_verify_peer opts[:tls][:verify_peer] = false end # Allow overriding directly but default to those which server supports. opts[:tls][:ssl_version] ||= %w(tlsv1 tlsv1_1 tlsv1_2) opts[:tls][:protocols] ||= %w(tlsv1 tlsv1_1 tlsv1_2) end # If they pass an array here just pass along to the real connection, and use first as the first attempt.. # Real connection will do proper walk throughs etc.. unless uri.nil? uris = uri.kind_of?(Array) ? uri : [uri] uris.shuffle! unless opts[:dont_randomize_servers] u = uris.first @uri = u.is_a?(URI) ? u.dup : URI.parse(u) end @err_cb = proc { |e| raise e } unless err_cb @close_cb = proc { } unless close_cb @disconnect_cb = proc { } unless disconnect_cb client = EM.connect(@uri.host, @uri.port, self, opts) client.on_connect(&blk) if blk return client end |
permalink .connected? ⇒ Boolean
Returns Connected state.
282 283 284 285 |
# File 'lib/nats/client.rb', line 282 def connected? return false unless client client.connected? end |
permalink .connected_server ⇒ URI
Returns Connected server.
276 277 278 279 |
# File 'lib/nats/client.rb', line 276 def connected_server return nil unless client client.connected_server end |
permalink .create_inbox ⇒ String
Returns a subject that can be used for “directed” communications.
370 371 372 |
# File 'lib/nats/client.rb', line 370 def create_inbox "_INBOX.#{SecureRandom.hex(13)}" end |
permalink .drain(&blk) ⇒ Object
Drain gracefully disconnects from the server, letting subscribers process pending messages already sent by server and optionally calls the associated block.
269 270 271 272 273 |
# File 'lib/nats/client.rb', line 269 def drain(&blk) if (client and !client.draining? and (client.connected? || client.reconnecting?)) client.drain { blk.call if blk } end end |
permalink .draining? ⇒ Boolean
Returns Draining state.
294 295 296 297 |
# File 'lib/nats/client.rb', line 294 def draining? return false unless client client.draining? end |
permalink .flush(*args, &blk) ⇒ Object
Flushes all messages and subscriptions in the default connection
376 377 378 |
# File 'lib/nats/client.rb', line 376 def flush(*args, &blk) (@client ||= connect).flush(*args, &blk) end |
permalink .on_close(&callback) ⇒ Object
Set the default on_closed callback.
333 334 335 336 |
# File 'lib/nats/client.rb', line 333 def on_close(&callback) @close_cb = callback @client.on_close(&callback) unless @client.nil? end |
permalink .on_disconnect(&callback) ⇒ Object
Set the default on_disconnect callback.
326 327 328 329 |
# File 'lib/nats/client.rb', line 326 def on_disconnect(&callback) @disconnect_cb = callback @client.on_disconnect(&callback) unless @client.nil? end |
permalink .on_error(&callback) ⇒ Object
Set the default on_error callback.
313 314 315 |
# File 'lib/nats/client.rb', line 313 def on_error(&callback) @err_cb, @err_cb_overridden = callback, true end |
permalink .on_reconnect(&callback) ⇒ Object
Set the default on_reconnect callback.
319 320 321 322 |
# File 'lib/nats/client.rb', line 319 def on_reconnect(&callback) @reconnect_cb = callback @client.on_reconnect(&callback) unless @client.nil? end |
permalink .options ⇒ Hash
Returns Options.
300 301 302 303 |
# File 'lib/nats/client.rb', line 300 def return {} unless client client. end |
permalink .pending_data_size(*args) ⇒ Object
Return bytes outstanding for the default client connection.
382 383 384 |
# File 'lib/nats/client.rb', line 382 def pending_data_size(*args) (@client ||= connect).pending_data_size(*args) end |
permalink .publish(*args, &blk) ⇒ Object
Publish a message using the default client connection.
340 341 342 |
# File 'lib/nats/client.rb', line 340 def publish(*args, &blk) (@client ||= connect).publish(*args, &blk) end |
permalink .reconnecting? ⇒ Boolean
Returns Reconnecting state.
288 289 290 291 |
# File 'lib/nats/client.rb', line 288 def reconnecting? return false unless client client.reconnecting? end |
permalink .request(*args, &blk) ⇒ Object
Publish a message and wait for a response on the default client connection.
364 365 366 |
# File 'lib/nats/client.rb', line 364 def request(*args, &blk) (@client ||= connect).request(*args, &blk) end |
permalink .server_info ⇒ Hash
Returns Server information.
306 307 308 309 |
# File 'lib/nats/client.rb', line 306 def server_info return nil unless client client.server_info end |
permalink .server_running?(uri) ⇒ Boolean
:nodoc:
394 395 396 397 398 399 400 401 |
# File 'lib/nats/client.rb', line 394 def server_running?(uri) # :nodoc: require 'socket' s = TCPSocket.new(uri.host, uri.port) s.close return true rescue return false end |
permalink .start(*args, &blk) ⇒ Object
Create a default client connection to the server.
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/nats/client.rb', line 236 def start(*args, &blk) @reactor_was_running = EM.reactor_running? unless (@reactor_was_running || blk) raise(Error, "EM needs to be running when NATS.start is called without a run block") end # Setup optimized select versions if EM.epoll? EM.epoll elsif EM.kqueue? EM.kqueue elsif EM.library_type == :java # No warning needed, we're using Java NIO else Kernel.warn('Neither epoll nor kqueue are supported, performance may be impacted') end EM.run { @client = connect(*args, &blk) } end |
permalink .stop(&blk) ⇒ Object
Close the default client connection and optionally call the associated block.
256 257 258 259 260 261 262 263 |
# File 'lib/nats/client.rb', line 256 def stop(&blk) client.close if (client and (client.connected? || client.reconnecting?)) blk.call if blk @err_cb = nil @close_cb = nil @reconnect_cb = nil @disconnect_cb = nil end |
permalink .subscribe(*args, &blk) ⇒ Object
Subscribe using the default client connection.
346 347 348 |
# File 'lib/nats/client.rb', line 346 def subscribe(*args, &blk) (@client ||= connect).subscribe(*args, &blk) end |
permalink .timeout(*args, &blk) ⇒ Object
Set a timeout for receiving messages for the subscription.
358 359 360 |
# File 'lib/nats/client.rb', line 358 def timeout(*args, &blk) (@client ||= connect).timeout(*args, &blk) end |
permalink .unsubscribe(*args) ⇒ Object
Cancel a subscription on the default client connection.
352 353 354 |
# File 'lib/nats/client.rb', line 352 def unsubscribe(*args) (@client ||= connect).unsubscribe(*args) end |
permalink .wait_for_server(uri, max_wait = 5) ⇒ Object
:nodoc:
386 387 388 389 390 391 392 |
# File 'lib/nats/client.rb', line 386 def wait_for_server(uri, max_wait = 5) # :nodoc: start = Time.now while (Time.now - start < max_wait) # Wait max_wait seconds max break if server_running?(uri) sleep(0.1) end end |
Instance Method Details
permalink #attempt_reconnect ⇒ Object
:nodoc:
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 |
# File 'lib/nats/client.rb', line 1206 def attempt_reconnect #:nodoc: @reconnect_timer = nil current = server_pool.first # Snapshot time when trying to reconnect to server # in order to back off for subsequent attempts. current[:last_reconnect_attempt] = MonotonicTime.now current[:reconnect_attempts] ||= 0 current[:reconnect_attempts] += 1 begin EM.reconnect(@uri.host, @uri.port, self) rescue current[:error_received] = true @uri = nil @connected = false end end |
permalink #auth_connection? ⇒ Boolean
784 785 786 |
# File 'lib/nats/client.rb', line 784 def auth_connection? !@uri.user.nil? || @options[:token] || @server_info[:auth_required] end |
permalink #bind_primary ⇒ Object
:nodoc:
1341 1342 1343 1344 1345 1346 1347 |
# File 'lib/nats/client.rb', line 1341 def bind_primary #:nodoc: first = server_pool.first @uri = first[:uri] @uri.user = [:user] if [:user] @uri.password = [:pass] if [:pass] first end |
permalink #can_reuse_server?(server) ⇒ Boolean
:nodoc:
1200 1201 1202 1203 1204 |
# File 'lib/nats/client.rb', line 1200 def can_reuse_server?(server) #:nodoc: # If we will retry a number of times to reconnect to a server # unless we got an error from it already. reconnecting? && server[:reconnect_attempts] <= @options[:max_reconnect_attempts] && !server[:error_received] end |
permalink #cancel_ping_timer ⇒ Object
[View source]
1039 1040 1041 1042 1043 1044 |
# File 'lib/nats/client.rb', line 1039 def cancel_ping_timer if @ping_timer EM.cancel_timer(@ping_timer) @ping_timer = nil end end |
permalink #cancel_reconnect_timer ⇒ Object
[View source]
1169 1170 1171 1172 1173 1174 |
# File 'lib/nats/client.rb', line 1169 def cancel_reconnect_timer if @reconnect_timer EM.cancel_timer(@reconnect_timer) @reconnect_timer = nil end end |
permalink #client_using_secure_connection? ⇒ Boolean
1019 1020 1021 |
# File 'lib/nats/client.rb', line 1019 def client_using_secure_connection? @tls || @ssl end |
permalink #close ⇒ Object
Close the connection to the server.
767 768 769 770 771 772 773 |
# File 'lib/nats/client.rb', line 767 def close @closing = true cancel_ping_timer cancel_reconnect_timer close_connection_after_writing if connected? process_disconnect if reconnecting? end |
permalink #connect_command ⇒ Object
:nodoc:
788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 |
# File 'lib/nats/client.rb', line 788 def connect_command #:nodoc: cs = { :verbose => @options[:verbose], :pedantic => @options[:pedantic], :lang => ::NATS::LANG, :version => ::NATS::VERSION, :protocol => ::NATS::PROTOCOL_VERSION, :echo => !@options[:no_echo] } case when @options[:user_credentials] nonce = @server_info[:nonce] cs[:jwt] = @user_jwt_cb.call cs[:sig] = @signature_cb.call(nonce) when @options[:nkeys_seed] nonce = @server_info[:nonce] cs[:nkey] = @user_nkey_cb.call cs[:sig] = @signature_cb.call(nonce) when @options[:token] cs[:auth_token] = @options[:token] when @uri.password.nil? cs[:auth_token] = @uri.user else cs[:user] = @uri.user cs[:pass] = @uri.password end if auth_connection? cs[:name] = @options[:name] if @options[:name] cs[:ssl_required] = @ssl if @ssl cs[:tls_required] = true if @tls "CONNECT #{cs.to_json}#{CR_LF}" end |
permalink #connected_server ⇒ URI
Returns Connected server.
1331 1332 1333 |
# File 'lib/nats/client.rb', line 1331 def connected_server connected? ? @uri : nil end |
permalink #connection_completed ⇒ Object
:nodoc:
1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 |
# File 'lib/nats/client.rb', line 1046 def connection_completed #:nodoc: @parse_state = AWAITING_INFO_LINE # Delay sending CONNECT or any other command here until we are sure # that we have a valid established secure connection. return if (@ssl or @tls) # Mark that we established already TCP connection to the server, # when using TLS we only do so after handshake has been completed. @connected = true end |
permalink #disconnect_error_string ⇒ Object
[View source]
1176 1177 1178 1179 |
# File 'lib/nats/client.rb', line 1176 def disconnect_error_string return "Client disconnected from server on #{@uri}" if @connected return "Could not connect to server on #{@uri}" end |
permalink #discovered_servers ⇒ Object
Retrieves the list of servers which have been discovered via server connect_urls announcements
1337 1338 1339 |
# File 'lib/nats/client.rb', line 1337 def discovered_servers server_pool.select {|s| s[:discovered] } end |
permalink #drain(&blk) ⇒ Object
Drain gracefully closes the connection.
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 |
# File 'lib/nats/client.rb', line 551 def drain(&blk) return if draining? or closing? @draining = true # Remove interest in all subjects to stop receiving messages. @subs.each do |sid, _| send_command("UNSUB #{sid} #{CR_LF}") end # Roundtrip to ensure no more messages are received. flush do drain_timeout_timer, draining_timer = nil, nil drain_timeout_timer = EM.add_timer([:drain_timeout]) do EM.cancel_timer(draining_timer) # Report the timeout via the error callback and just close err_cb.call(NATS::ClientError.new("Drain Timeout")) @draining = false close unless closing? blk.call if blk end # Periodically check for the pending data to be empty. draining_timer = EM.add_periodic_timer(0.1) do next unless closing? or @buf.nil? or @buf.empty? # Subscriptions have been drained already so disallow publishing. @drained_subs = true next unless pending_data_size == 0 EM.cancel_timer(draining_timer) EM.cancel_timer(drain_timeout_timer) # We're done draining and can close now. @draining = false close unless closing? blk.call if blk end end end |
permalink #flush(&blk) ⇒ Object
Flushes all messages and subscriptions for the connection. All messages and subscriptions have been processed by the server when the optional callback is called.
732 733 734 |
# File 'lib/nats/client.rb', line 732 def flush(&blk) queue_server_rt(&blk) if blk end |
permalink #flush_pending ⇒ Object
:nodoc:
866 867 868 869 870 |
# File 'lib/nats/client.rb', line 866 def flush_pending #:nodoc: return unless @pending send_data(@pending.join) @pending, @pending_size = nil, 0 end |
permalink #had_error? ⇒ Boolean
1161 1162 1163 |
# File 'lib/nats/client.rb', line 1161 def had_error? server_pool.first && server_pool.first[:error_received] end |
permalink #initialize(options) ⇒ Object
[View source]
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 |
# File 'lib/nats/client.rb', line 456 def initialize() @options = @buf = nil @ssid, @subs = 1, {} @err_cb = NATS.err_cb @close_cb = NATS.close_cb @reconnect_cb = NATS.reconnect_cb @disconnect_cb = NATS.disconnect_cb @reconnect_timer, @needed = nil, nil @connected, @closing, @reconnecting, @conn_cb_called = false, false, false, false @msgs_received = @msgs_sent = @bytes_received = @bytes_sent = @pings = 0 @pending_size = 0 @server_info = { } # Mark whether we should be connecting securely, try best effort # in being compatible with present ssl support. @ssl = false @tls = nil @tls = [:tls] if [:tls] @ssl = [:ssl] if [:ssl] or @tls # New style request/response implementation. @resp_sub = nil @resp_map = nil @resp_sub_prefix = nil @nuid = NATS::NUID.new # Drain mode @draining = false @drained_subs = false # NKEYS @user_credentials = [:user_credentials] if [:user_credentials] @nkeys_seed = [:nkeys_seed] if [:nkeys_seed] @user_nkey_cb = nil @user_jwt_cb = nil @signature_cb = nil # NKEYS setup_nkeys_connect if @user_credentials or @nkeys_seed end |
permalink #inspect ⇒ Object
:nodoc:
1377 1378 1379 |
# File 'lib/nats/client.rb', line 1377 def inspect #:nodoc: "<nats client v#{NATS::VERSION}>" end |
permalink #multiple_servers_available? ⇒ Boolean
1157 1158 1159 |
# File 'lib/nats/client.rb', line 1157 def multiple_servers_available? server_pool && server_pool.size > 1 end |
permalink #on_close(&callback) ⇒ Object
Define a callback to be called when client is disconnected from server.
762 763 764 |
# File 'lib/nats/client.rb', line 762 def on_close(&callback) @close_cb = callback end |
permalink #on_connect(&callback) ⇒ Object
Define a callback to be called when the client connection has been established.
738 739 740 |
# File 'lib/nats/client.rb', line 738 def on_connect(&callback) @connect_cb = callback end |
permalink #on_disconnect(&callback) ⇒ Object
Define a callback to be called when client is disconnected from server.
756 757 758 |
# File 'lib/nats/client.rb', line 756 def on_disconnect(&callback) @disconnect_cb = callback end |
permalink #on_error(&callback) ⇒ Object
Define a callback to be called when errors occur on the client connection.
744 745 746 |
# File 'lib/nats/client.rb', line 744 def on_error(&callback) @err_cb, @err_cb_overridden = callback, true end |
permalink #on_msg(subject, sid, reply, msg) ⇒ Object
:nodoc:
833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 |
# File 'lib/nats/client.rb', line 833 def on_msg(subject, sid, reply, msg) #:nodoc: # Accounting - We should account for inbound even if they are not processed. @msgs_received += 1 @bytes_received += msg.bytesize if msg return unless sub = @subs[sid] # Check for auto_unsubscribe sub[:received] += 1 if sub[:max] # Client side support in case server did not receive unsubscribe return unsubscribe(sid) if (sub[:received] > sub[:max]) # cleanup here if we have hit the max.. @subs.delete(sid) if (sub[:received] == sub[:max]) end if cb = sub[:callback] case cb.arity when 0 then cb.call when 1 then cb.call(msg) when 2 then cb.call(msg, reply) else cb.call(msg, reply, subject) end end # Check for a timeout, and cancel if received >= expected if (sub[:timeout] && sub[:received] >= sub[:expected]) EM.cancel_timer(sub[:timeout]) sub[:timeout] = nil end end |
permalink #on_reconnect(&callback) ⇒ Object
Define a callback to be called when a reconnect attempt is made.
750 751 752 |
# File 'lib/nats/client.rb', line 750 def on_reconnect(&callback) @reconnect_cb = callback end |
permalink #pending_data_size ⇒ Object
Return bytes outstanding waiting to be sent to server.
776 777 778 |
# File 'lib/nats/client.rb', line 776 def pending_data_size get_outbound_data_size + @pending_size end |
permalink #process_connect ⇒ Object
:nodoc:
1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 |
# File 'lib/nats/client.rb', line 1062 def process_connect #:nodoc: # Reset reconnect attempts since TCP connection has been successful at this point. current = server_pool.first current[:was_connected] = true current[:reconnect_attempts] ||= 0 cancel_reconnect_timer if reconnecting? # Whip through any pending SUB commands since we replay # all subscriptions already done anyway. @pending.delete_if { |sub| sub[0..2] == SUB_OP } if @pending @subs.each_pair { |k, v| send_command("SUB #{v[:subject]} #{v[:queue]} #{k}#{CR_LF}") } unless user_err_cb? or reconnecting? @err_cb = proc { |e| raise e } end # We have validated the connection at this point so send CONNECT # and any other pending commands which we need to the server. flush_pending if (connect_cb and not @conn_cb_called) # We will round trip the server here to make sure all state from any pending commands # has been processed before calling the connect callback. queue_server_rt do connect_cb.call(self) @conn_cb_called = true end end # Notify via reconnect callback that we are again plugged again into the system. if reconnecting? @reconnecting = false @reconnect_cb.call(self) unless @reconnect_cb.nil? end # Initialize ping timer and processing @pings_outstanding = 0 @pongs_received = 0 @ping_timer = EM.add_periodic_timer(@options[:ping_interval]) do send_ping end end |
permalink #process_connect_init(info) ⇒ Object
:nodoc:
934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 |
# File 'lib/nats/client.rb', line 934 def process_connect_init(info) # :nodoc: # Each JSON parser uses a different key/value pair to use symbol keys # instead of strings when parsing. Passing all three pairs assures each # parser gets what it needs. For the json gem :symbolize_name, for yajl # :symbolize_keys, and for oj :symbol_keys. @server_info = JSON.parse(info, :symbolize_keys => true, :symbolize_names => true, :symbol_keys => true) case when (server_using_secure_connection? and client_using_secure_connection?) # Allow parameterizing secure connection via EM#start_tls directly if present. start_tls(@tls || {}) when (server_using_secure_connection? and !client_using_secure_connection?) # Call unbind since there is a configuration mismatch between client/server # anyway and communication cannot happen in this state. err_cb.call(NATS::ClientError.new('TLS/SSL required by server')) close_connection_after_writing when (client_using_secure_connection? and !server_using_secure_connection?) err_cb.call(NATS::ClientError.new('TLS/SSL not supported by server')) close_connection_after_writing else # Otherwise, use a regular connection. end # Check whether there no echo is supported by the server. if @options[:no_echo] if @server_info[:proto].nil? || @server_info[:proto] < 1 err_cb.call(NATS::ServerError.new('No echo option not supported by this server')) close_connection_after_writing end end send_connect_command # Only initial INFO command is treated specially for auth reasons, # the rest are processed asynchronously to discover servers. @parse_state = AWAITING_CONTROL_LINE process_info(info) process_connect if @server_info[:auth_required] current = server_pool.first current[:auth_required] = true # Send pending connect followed by ping/pong to ensure we're authorized. queue_server_rt { current[:auth_ok] = true } end flush_pending end |
permalink #process_disconnect ⇒ Object
:nodoc:
1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 |
# File 'lib/nats/client.rb', line 1181 def process_disconnect #:nodoc: # Mute error callback when user has called NATS.close on purpose. if not closing? and @err_cb # Always call error callback for compatibility with previous behavior. err_cb.call(NATS::ConnectError.new(disconnect_error_string)) end close_cb.call if @close_cb true # Chaining ensure cancel_ping_timer cancel_reconnect_timer if (NATS.client == self) NATS.clear_client EM.stop if ((connected? || reconnecting?) and closing? and not NATS.reactor_was_running?) end @connected = @reconnecting = false end |
permalink #process_info(info_line) ⇒ Object
:nodoc:
982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 |
# File 'lib/nats/client.rb', line 982 def process_info(info_line) #:nodoc: info = JSON.parse(info_line, :symbolize_keys => true, :symbolize_names => true, :symbol_keys => true) # Detect any announced server that we might not be aware of... connect_urls = info[:connect_urls] if connect_urls srvs = [] connect_urls.each do |url| u = URI.parse("nats://#{url}") present = server_pool.detect do |srv| srv[:uri].host == u.host && srv[:uri].port == u.port end if not present # Let explicit user and pass options set the credentials. u.user = [:user] if [:user] u.password = [:pass] if [:pass] # Use creds from the current server if not set explicitly. if @uri and !@uri.user.nil? and !@uri.user.empty? u.user ||= @uri.user u.password ||= @uri.password end srvs << { :uri => u, :reconnect_attempts => 0, :discovered => true } end end srvs.shuffle! unless @options[:dont_randomize_servers] # Include in server pool but keep current one as the first one. server_pool.push(*srvs) end info end |
permalink #process_pong ⇒ Object
[View source]
1117 1118 1119 1120 |
# File 'lib/nats/client.rb', line 1117 def process_pong @pongs_received += 1 @pings_outstanding -= 1 end |
permalink #process_uri_options ⇒ Object
Parse out URIs which can now be an array of server choices The server pool will contain both explicit and implicit members.
1322 1323 1324 1325 1326 1327 1328 |
# File 'lib/nats/client.rb', line 1322 def #:nodoc @server_pool = [] uri = [:uris] || [:servers] || [:uri] uri = uri.kind_of?(Array) ? uri : [uri] uri.each { |u| server_pool << { :uri => u.is_a?(URI) ? u.dup : URI.parse(u) } } bind_primary end |
permalink #publish(subject, msg = EMPTY_MSG, opt_reply = nil, &blk) ⇒ Object
Publish a message to a given subject, with optional reply subject and completion block
505 506 507 508 509 510 511 512 513 514 515 |
# File 'lib/nats/client.rb', line 505 def publish(subject, msg=EMPTY_MSG, opt_reply=nil, &blk) return unless subject and not @drained_subs msg = msg.to_s # Accounting @msgs_sent += 1 @bytes_sent += msg.bytesize if msg send_command("PUB #{subject} #{opt_reply} #{msg.bytesize}#{CR_LF}#{msg}#{CR_LF}") queue_server_rt(&blk) if blk end |
permalink #queue_server_rt(&cb) ⇒ Object
:nodoc:
827 828 829 830 831 |
# File 'lib/nats/client.rb', line 827 def queue_server_rt(&cb) #:nodoc: return unless cb (@pongs ||= []) << cb send_command(PING_REQUEST) end |
permalink #receive_data(data) ⇒ Object
:nodoc:
872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 |
# File 'lib/nats/client.rb', line 872 def receive_data(data) #:nodoc: @buf = @buf ? @buf << data : data while (@buf) case @parse_state when AWAITING_INFO_LINE case @buf when INFO @buf = $' process_connect_init($1) else # If we are here we do not have a complete line yet that we understand. return end when AWAITING_CONTROL_LINE case @buf when MSG @buf = $' @sub, @sid, @reply, @needed = $1, $2.to_i, $4, $5.to_i @parse_state = AWAITING_MSG_PAYLOAD when OK # No-op right now @buf = $' when ERR @buf = $' current = server_pool.first current[:error_received] = true if current[:auth_required] && !current[:auth_ok] err_cb.call(NATS::AuthError.new($1)) else err_cb.call(NATS::ServerError.new($1)) end when PING @pings += 1 @buf = $' send_command(PONG_RESPONSE) when PONG @buf = $' cb = @pongs.shift cb.call if cb when INFO @buf = $' process_info($1) when UNKNOWN @buf = $' err_cb.call(NATS::ServerError.new("Unknown protocol: #{$1}")) else # If we are here we do not have a complete line yet that we understand. return end @buf = nil if (@buf && @buf.empty?) when AWAITING_MSG_PAYLOAD return unless (@needed && @buf.bytesize >= (@needed + CR_LF_SIZE)) on_msg(@sub, @sid, @reply, @buf.slice(0, @needed)) @buf = @buf.slice((@needed + CR_LF_SIZE), @buf.bytesize) @sub = @sid = @reply = @needed = nil @parse_state = AWAITING_CONTROL_LINE @buf = nil if (@buf && @buf.empty?) end end end |
permalink #request(subject, data = nil, opts = {}, &cb) ⇒ Object
Send a request and have the response delivered to the supplied callback.
623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 |
# File 'lib/nats/client.rb', line 623 def request(subject, data=nil, opts={}, &cb) return unless subject # In case of using async request then fallback to auto unsubscribe # based request/response and not break compatibility too much since # new request/response style can only be used with fibers. if cb inbox = "_INBOX.#{@nuid.next}" s = subscribe(inbox, opts) { |msg, reply| case cb.arity when 0 then cb.call when 1 then cb.call(msg) else cb.call(msg, reply) end } publish(subject, data, inbox) return s end # If this is the first request being made, then need to start # the responses mux handler that handles the responses. start_resp_mux_sub! unless @resp_sub_prefix # Generate unique token for the reply subject. token = @nuid.next inbox = "#{@resp_sub_prefix}.#{token}" # Synchronous request/response requires using a Fiber # to be able to await the response. f = Fiber.current @resp_map[token][:fiber] = f # If awaiting more than a single response then use array # to include all that could be gathered before the deadline. expected = opts[:max] ||= 1 @resp_map[token][:expected] = expected @resp_map[token][:msgs] = [] if expected > 1 # Announce the request with the inbox using the token. publish(subject, data, inbox) # If deadline expires, then discard the token and resume fiber opts[:timeout] ||= 0.5 t = EM.add_timer(opts[:timeout]) do if expected > 1 f.resume @resp_map[token][:msgs] else f.resume end @resp_map.delete(token) end # Wait for the response and cancel timeout callback if received. if expected > 1 # Wait to receive all replies that can get before deadline. msgs = Fiber.yield EM.cancel_timer(t) # Slice and throwaway responses that are not needed. return msgs.slice(0, expected) else msg = Fiber.yield EM.cancel_timer(t) return msg end end |
permalink #schedule_primary_and_connect ⇒ Object
We have failed on an attempt at the primary (first) server, rotate and try again
1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 |
# File 'lib/nats/client.rb', line 1350 def schedule_primary_and_connect #:nodoc: # Dump the one we were trying if it wasn't connected current = server_pool.shift # In case there was an error from the server we will take it out from rotation # unless we specify infinite reconnects via setting :max_reconnect_attempts to -1 if current && ([:max_reconnect_attempts] < 0 || can_reuse_server?(current)) server_pool << current end # If we are out of options, go ahead and disconnect then # handle closing connection to NATS. process_disconnect and return if server_pool.empty? # bind new one next_server = bind_primary # If the next one was connected and we are trying to reconnect # set up timer if we tried once already. if should_delay_connect?(next_server) schedule_reconnect else attempt_reconnect schedule_primary_and_connect if had_error? end end |
permalink #schedule_reconnect ⇒ Object
:nodoc:
1133 1134 1135 1136 1137 |
# File 'lib/nats/client.rb', line 1133 def schedule_reconnect #:nodoc: @reconnecting = true @connected = false @reconnect_timer = EM.add_timer(@options[:reconnect_time_wait]) { attempt_reconnect } end |
permalink #send_command(command, priority = false) ⇒ Object
:nodoc:
1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 |
# File 'lib/nats/client.rb', line 1225 def send_command(command, priority = false) #:nodoc: needs_flush = (connected? && @pending.nil?) @pending ||= [] @pending << command unless priority @pending.unshift(command) if priority @pending_size += command.bytesize EM.next_tick { flush_pending } if needs_flush flush_pending if (connected? && @pending_size > MAX_PENDING_SIZE) if (@options[:fast_producer_error] && pending_data_size > FAST_PRODUCER_THRESHOLD) err_cb.call(NATS::ClientError.new("Fast Producer: #{pending_data_size} bytes outstanding")) end true end |
permalink #send_connect_command ⇒ Object
:nodoc:
823 824 825 |
# File 'lib/nats/client.rb', line 823 def send_connect_command #:nodoc: send_command(connect_command, true) end |
permalink #send_ping ⇒ Object
:nodoc:
1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 |
# File 'lib/nats/client.rb', line 1105 def send_ping #:nodoc: return if @closing @pings_outstanding += 1 if @pings_outstanding > @options[:max_outstanding_pings] close_connection #close return end queue_server_rt { process_pong } flush_pending end |
permalink #server_using_secure_connection? ⇒ Boolean
1023 1024 1025 |
# File 'lib/nats/client.rb', line 1023 def server_using_secure_connection? @server_info[:ssl_required] || @server_info[:tls_required] end |
permalink #setup_nkeys_connect ⇒ Object
[View source]
1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 |
# File 'lib/nats/client.rb', line 1242 def setup_nkeys_connect begin require 'nkeys' require 'base64' rescue LoadError raise(Error, "nkeys is not installed") end case when @nkeys_seed @user_nkey_cb = proc { seed = File.read(@nkeys_seed).chomp kp = NKEYS::from_seed(seed) # Take a copy since original will be gone with the wipe. pub_key = kp.public_key.dup kp.wipe! pub_key } @signature_cb = proc { |nonce| seed = File.read(@nkeys_seed).chomp kp = NKEYS::from_seed(seed) raw_signed = kp.sign(nonce) kp.wipe! encoded = Base64.urlsafe_encode64(raw_signed) encoded.gsub('=', '') } when @user_credentials # When the credentials are within a single decorated file. @user_jwt_cb = proc { jwt_start = "BEGIN NATS USER JWT".freeze found = false jwt = nil File.readlines(@user_credentials).each do |line| case when found jwt = line.chomp break when line.include?(jwt_start) found = true end end raise(Error, "No JWT found in #{@user_credentials}") if not found jwt } @signature_cb = proc { |nonce| seed_start = "BEGIN USER NKEY SEED".freeze found = false seed = nil File.readlines(@user_credentials).each do |line| case when found seed = line.chomp break when line.include?(seed_start) found = true end end raise(Error, "No nkey user seed found in #{@user_credentials}") if not found kp = NKEYS::from_seed(seed) raw_signed = kp.sign(nonce) # seed is a reference so also cleared when doing wipe, # which can be done since Ruby strings are mutable. kp.wipe encoded = Base64.urlsafe_encode64(raw_signed) # Remove padding encoded.gsub('=', '') } end end |
permalink #should_delay_connect?(server) ⇒ Boolean
1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 |
# File 'lib/nats/client.rb', line 1122 def should_delay_connect?(server) case when server[:was_connected] server[:reconnect_attempts] >= 0 when server[:last_reconnect_attempt] (MonotonicTime.now - server[:last_reconnect_attempt]) < @options[:reconnect_time_wait] else false end end |
permalink #should_not_reconnect? ⇒ Boolean
1165 1166 1167 |
# File 'lib/nats/client.rb', line 1165 def should_not_reconnect? !@options[:reconnect] end |
permalink #ssl_handshake_completed ⇒ Object
[View source]
1058 1059 1060 |
# File 'lib/nats/client.rb', line 1058 def ssl_handshake_completed @connected = true end |
permalink #ssl_verify_peer(cert) ⇒ Object
[View source]
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 |
# File 'lib/nats/client.rb', line 1027 def ssl_verify_peer(cert) incoming = OpenSSL::X509::Certificate.new(cert) store = OpenSSL::X509::Store.new store.set_default_paths store.add_file @options[:tls][:ca_file] result = store.verify(incoming) err_cb.call(NATS::ConnectError.new('TLS Verification failed checking issuer based on CA %s' % @options[:tls][:ca_file])) unless result result rescue NATS::ConnectError false end |
permalink #start_resp_mux_sub! ⇒ Object
[View source]
691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 |
# File 'lib/nats/client.rb', line 691 def start_resp_mux_sub! @resp_sub_prefix = "_INBOX.#{@nuid.next}" @resp_map = Hash.new { |h,k| h[k] = { }} # Single subscription that will be handling all the requests # using fibers to yield the responses. subscribe("#{@resp_sub_prefix}.*") do |msg, reply, subject| token = subject.split('.').last # Discard the response if requestor not interested already. next unless @resp_map.key? token # Take fiber that will be passed the response f = @resp_map[token][:fiber] expected = @resp_map[token][:expected] if expected == 1 f.resume msg @resp_map.delete(token) next end if @resp_map[token][:msgs].size < expected @resp_map[token][:msgs] << msg msgs = @resp_map[token][:msgs] if msgs.size >= expected f.resume(msgs) else # Wait to gather more messages or timeout. next end end @resp_map.delete(token) end end |
permalink #subscribe(subject, opts = {}, &callback) ⇒ Object
Subscribe to a subject with optional wildcards. Messages will be delivered to the supplied callback. Callback can take any number of the supplied arguments as defined by the list: msg, reply, sub. Returns subscription id which can be passed to #unsubscribe.
525 526 527 528 529 530 531 532 533 534 535 |
# File 'lib/nats/client.rb', line 525 def subscribe(subject, opts={}, &callback) return unless subject and not draining? sid = (@ssid += 1) sub = @subs[sid] = { :subject => subject, :callback => callback, :received => 0 } sub[:queue] = opts[:queue] if opts[:queue] sub[:max] = opts[:max] if opts[:max] send_command("SUB #{subject} #{opts[:queue]} #{sid}#{CR_LF}") # Setup server support for auto-unsubscribe unsubscribe(sid, opts[:max]) if opts[:max] sid end |
permalink #subscription_count ⇒ Number
Return the active subscription count.
593 594 595 |
# File 'lib/nats/client.rb', line 593 def subscription_count @subs.size end |
permalink #timeout(sid, timeout, opts = {}, &callback) ⇒ Object
Setup a timeout for receiving messages for the subscription.
601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 |
# File 'lib/nats/client.rb', line 601 def timeout(sid, timeout, opts={}, &callback) # Setup a timeout if requested return unless sub = @subs[sid] auto_unsubscribe, expected = true, 1 auto_unsubscribe = opts[:auto_unsubscribe] if opts.key?(:auto_unsubscribe) expected = opts[:expected] if opts.key?(:expected) EM.cancel_timer(sub[:timeout]) if sub[:timeout] sub[:timeout] = EM.add_timer(timeout) do unsubscribe(sid) if auto_unsubscribe callback.call(sid) if callback end sub[:expected] = expected end |
permalink #unbind ⇒ Object
:nodoc:
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 |
# File 'lib/nats/client.rb', line 1139 def unbind #:nodoc: # Allow notifying from which server we were disconnected, # but only when we didn't trigger disconnecting ourselves. if @disconnect_cb and connected? and not closing? @disconnect_cb.call(NATS::ConnectError.new(disconnect_error_string)) end # If we are closing or shouldn't reconnect, go ahead and disconnect. process_disconnect and return if (closing? or should_not_reconnect?) @reconnecting = true if connected? @connected = false @pending = @pongs = nil @buf = nil cancel_ping_timer schedule_primary_and_connect end |
permalink #unsubscribe(sid, opt_max = nil) ⇒ Object
Cancel a subscription.
540 541 542 543 544 545 546 547 |
# File 'lib/nats/client.rb', line 540 def unsubscribe(sid, opt_max=nil) return if draining? opt_max_str = " #{opt_max}" unless opt_max.nil? send_command("UNSUB #{sid}#{opt_max_str}#{CR_LF}") return unless sub = @subs[sid] sub[:max] = opt_max @subs.delete(sid) unless (sub[:max] && (sub[:received] < sub[:max])) end |
permalink #user_err_cb? ⇒ Boolean
:nodoc:
780 781 782 |
# File 'lib/nats/client.rb', line 780 def user_err_cb? # :nodoc: err_cb_overridden || NATS.err_cb_overridden end |