Module: NATS

Defined in:
lib/nats/client.rb

Defined Under Namespace

Classes: ClientError, ConnectError, Error, ServerError

Constant Summary collapse

VERSION =
'0.4.28'.freeze
DEFAULT_PORT =
4222
DEFAULT_URI =
"nats://localhost:#{DEFAULT_PORT}".freeze
MAX_RECONNECT_ATTEMPTS =
10
RECONNECT_TIME_WAIT =
2
MAX_PENDING_SIZE =
32768
FAST_PRODUCER_THRESHOLD =

Maximum outbound size per client to trigger FP, 20MB

(10*1024*1024)
MSG =

Protocol

/\AMSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n/i
OK =

:nodoc:

/\A\+OK\s*\r\n/i
ERR =

:nodoc:

/\A-ERR\s+('.+')?\r\n/i
PING =

:nodoc:

/\APING\s*\r\n/i
PONG =

:nodoc:

/\APONG\s*\r\n/i
INFO =

:nodoc:

/\AINFO\s+([^\r\n]+)\r\n/i
UNKNOWN =

:nodoc:

/\A(.*)\r\n/
CR_LF =

Responses

("\r\n".freeze)
CR_LF_SIZE =

:nodoc:

(CR_LF.bytesize)
PING_REQUEST =

:nodoc:

("PING#{CR_LF}".freeze)
PONG_RESPONSE =

:nodoc:

("PONG#{CR_LF}".freeze)
EMPTY_MSG =

:nodoc:

(''.freeze)
SUB =

Used for future pedantic Mode

/^([^\.\*>\s]+|>$|\*)(\.([^\.\*>\s]+|>$|\*))*$/
SUB_NO_WC =

:nodoc:

/^([^\.\*>\s]+)(\.([^\.\*>\s]+))*$/
AWAITING_CONTROL_LINE =

Parser

1
AWAITING_MSG_PAYLOAD =

:nodoc:

2
AUTOSTART_PID_FILE =

Autostart properties

'/tmp/nats-server.pid'
AUTOSTART_LOG_FILE =
'/tmp/nats-server.log'
@@tried_autostart =

Duplicate autostart protection

{}

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.clientObject (readonly)

:nodoc:



70
71
72
# File 'lib/nats/client.rb', line 70

def client
  @client
end

.err_cbObject (readonly)

:nodoc:



70
71
72
# File 'lib/nats/client.rb', line 70

def err_cb
  @err_cb
end

.err_cb_overriddenObject (readonly)

:nodoc:



70
71
72
# File 'lib/nats/client.rb', line 70

def err_cb_overridden
  @err_cb_overridden
end

.reactor_was_runningObject (readonly) Also known as: reactor_was_running?

:nodoc:



70
71
72
# File 'lib/nats/client.rb', line 70

def reactor_was_running
  @reactor_was_running
end

.reconnect_cbObject (readonly)

:nodoc



71
72
73
# File 'lib/nats/client.rb', line 71

def reconnect_cb
  @reconnect_cb
end

.timeout_cbObject

:nodoc



72
73
74
# File 'lib/nats/client.rb', line 72

def timeout_cb
  @timeout_cb
end

Instance Attribute Details

#bytes_receivedObject (readonly)

Returns the value of attribute bytes_received.



277
278
279
# File 'lib/nats/client.rb', line 277

def bytes_received
  @bytes_received
end

#bytes_sentObject (readonly)

Returns the value of attribute bytes_sent.



277
278
279
# File 'lib/nats/client.rb', line 277

def bytes_sent
  @bytes_sent
end

#closingObject (readonly) Also known as: closing?

:nodoc



276
277
278
# File 'lib/nats/client.rb', line 276

def closing
  @closing
end

#connect_cbObject (readonly)

:nodoc:



275
276
277
# File 'lib/nats/client.rb', line 275

def connect_cb
  @connect_cb
end

#connectedObject (readonly) Also known as: connected?

:nodoc:



275
276
277
# File 'lib/nats/client.rb', line 275

def connected
  @connected
end

#err_cbObject (readonly)

:nodoc:



275
276
277
# File 'lib/nats/client.rb', line 275

def err_cb
  @err_cb
end

#err_cb_overriddenObject (readonly)

:nodoc:



275
276
277
# File 'lib/nats/client.rb', line 275

def err_cb_overridden
  @err_cb_overridden
end

#msgs_receivedObject (readonly)

Returns the value of attribute msgs_received.



277
278
279
# File 'lib/nats/client.rb', line 277

def msgs_received
  @msgs_received
end

#msgs_sentObject (readonly)

Returns the value of attribute msgs_sent.



277
278
279
# File 'lib/nats/client.rb', line 277

def msgs_sent
  @msgs_sent
end

#optionsObject (readonly)

:nodoc



276
277
278
# File 'lib/nats/client.rb', line 276

def options
  @options
end

#pingsObject (readonly)

Returns the value of attribute pings.



277
278
279
# File 'lib/nats/client.rb', line 277

def pings
  @pings
end

#reconnectingObject (readonly) Also known as: reconnecting?

:nodoc



276
277
278
# File 'lib/nats/client.rb', line 276

def reconnecting
  @reconnecting
end

#server_infoObject (readonly)

:nodoc



276
277
278
# File 'lib/nats/client.rb', line 276

def server_info
  @server_info
end

Class Method Details

.clear_clientObject

:nodoc:



244
245
246
# File 'lib/nats/client.rb', line 244

def clear_client # :nodoc:
  @client = nil
end

.connect(opts = {}, &blk) ⇒ NATS

Create and return a connection to the server with the given options. The server will be autostarted if requested and the uri is determined to be local. The optional block will be called when the connection has been completed.

Parameters:

  • opts (Hash) (defaults to: {})
  • &blk (Block)

    called when the connection is completed. Connection will be passed to the block.

Options Hash (opts):

  • :uri (String|URI)

    The URI to connect to, example nats://localhost:4222

  • :autostart (Boolean)

    Boolean that can be used to engage server autostart functionality.

  • :reconnect (Boolean)

    Boolean that can be used to suppress reconnect functionality.

  • :debug (Boolean)

    Boolean that can be used to output additional debug information.

  • :verbose (Boolean)

    Boolean that is sent to server for setting verbose protocol mode.

  • :pedantic (Boolean)

    Boolean that is sent to server for setting pedantic mode.

  • :ssl (Boolean)

    Boolean that is sent to server for setting TLS/SSL mode.

  • :max_reconnect_attempts (Integer)

    Integer that can be used to set the max number of reconnect tries

  • :reconnect_time_wait (Integer)

    Integer that can be used to set the number of seconds to wait between reconnect tries

Returns:

  • (NATS)

    connection to the server.



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/nats/client.rb', line 92

def connect(opts={}, &blk)
  # Defaults
  opts[:verbose] = false if opts[:verbose].nil?
  opts[:pedantic] = false if opts[:pedantic].nil?
  opts[:reconnect] = true if opts[:reconnect].nil?
  opts[:ssl] = false if opts[:ssl].nil?
  opts[:max_reconnect_attempts] = MAX_RECONNECT_ATTEMPTS if opts[:max_reconnect_attempts].nil?
  opts[:reconnect_time_wait] = RECONNECT_TIME_WAIT if opts[:reconnect_time_wait].nil?

  # Override with ENV
  opts[:uri] ||= ENV['NATS_URI'] || DEFAULT_URI
  opts[:verbose] = ENV['NATS_VERBOSE'].downcase == 'true' unless ENV['NATS_VERBOSE'].nil?
  opts[:pedantic] = ENV['NATS_PEDANTIC'].downcase == 'true' unless ENV['NATS_PEDANTIC'].nil?
  opts[:debug] = ENV['NATS_DEBUG'].downcase == 'true' unless ENV['NATS_DEBUG'].nil?
  opts[:reconnect] = ENV['NATS_RECONNECT'].downcase == 'true' unless ENV['NATS_RECONNECT'].nil?
  opts[:fast_producer_error] = ENV['NATS_FAST_PRODUCER'].downcase == 'true' unless ENV['NATS_FAST_PRODUCER'].nil?
  opts[:ssl] = ENV['NATS_SSL'].downcase == 'true' unless ENV['NATS_SSL'].nil?
  opts[:max_reconnect_attempts] = ENV['NATS_MAX_RECONNECT_ATTEMPTS'].to_i unless ENV['NATS_MAX_RECONNECT_ATTEMPTS'].nil?
  opts[:reconnect_time_wait] = ENV['NATS_RECONNECT_TIME_WAIT'].to_i unless ENV['NATS_RECONNECT_TIME_WAIT'].nil?
  @uri = opts[:uri] = opts[:uri].is_a?(URI) ? opts[:uri] : URI.parse(opts[:uri])
  @err_cb = proc { |e| raise e } unless err_cb
  check_autostart(@uri) if opts[:autostart] == true

  client = EM.connect(@uri.host, @uri.port, self, opts)
  client.on_connect(&blk) if blk
  return client
end

.connected?Boolean

Returns Connected state.

Returns:

  • (Boolean)

    Connected state



142
143
144
145
# File 'lib/nats/client.rb', line 142

def connected?
  return false unless client
  client.connected?
end

.create_inboxString

Returns a subject that can be used for “directed” communications.

Returns:



209
210
211
212
213
# File 'lib/nats/client.rb', line 209

def create_inbox
  v = [rand(0x0010000),rand(0x0010000),rand(0x0010000),
       rand(0x0010000),rand(0x0010000),rand(0x1000000)]
  "_INBOX.%04x%04x%04x%04x%04x%06x" % v
end

.flush(*args, &blk) ⇒ Object

Flushes all messages and subscriptions in the default connection

See Also:



217
218
219
# File 'lib/nats/client.rb', line 217

def flush(*args, &blk)
  (@client ||= connect).flush(*args, &blk)
end

.on_error(&callback) ⇒ Object

Set the default on_error callback.

Parameters:

  • &callback (Block)

    called when an error has been detected.



167
168
169
# File 'lib/nats/client.rb', line 167

def on_error(&callback)
  @err_cb, @err_cb_overridden = callback, true
end

.on_reconnect(&callback) ⇒ Object

Set the default on_reconnect callback.

Parameters:

  • &callback (Block)

    called when a reconnect attempt is made.



173
174
175
# File 'lib/nats/client.rb', line 173

def on_reconnect(&callback)
  @reconnect_cb = callback
end

.optionsHash

Returns Options.

Returns:

  • (Hash)

    Options



154
155
156
157
# File 'lib/nats/client.rb', line 154

def options
  return {} unless client
  client.options
end

.pending_data_size(*args) ⇒ Object

Return bytes outstanding for the default client connection.

See Also:



223
224
225
# File 'lib/nats/client.rb', line 223

def pending_data_size(*args)
  (@client ||= connect).pending_data_size(*args)
end

.publish(*args, &blk) ⇒ Object

Publish a message using the default client connection.

See Also:



179
180
181
# File 'lib/nats/client.rb', line 179

def publish(*args, &blk)
  (@client ||= connect).publish(*args, &blk)
end

.reconnecting?Boolean

Returns Reconnecting state.

Returns:

  • (Boolean)

    Reconnecting state



148
149
150
151
# File 'lib/nats/client.rb', line 148

def reconnecting?
  return false unless client
  client.reconnecting?
end

.request(*args, &blk) ⇒ Object

Publish a message and wait for a response on the default client connection.

See Also:



203
204
205
# File 'lib/nats/client.rb', line 203

def request(*args, &blk)
  (@client ||= connect).request(*args, &blk)
end

.server_infoHash

Returns Server information.

Returns:

  • (Hash)

    Server information



160
161
162
163
# File 'lib/nats/client.rb', line 160

def server_info
  return nil unless client
  client.server_info
end

.server_running?(uri) ⇒ Boolean

:nodoc:

Returns:

  • (Boolean)


235
236
237
238
239
240
241
242
# File 'lib/nats/client.rb', line 235

def server_running?(uri) # :nodoc:
  require 'socket'
  s = TCPSocket.new(uri.host, uri.port)
  s.close
  return true
rescue
  return false
end

.start(*args, &blk) ⇒ Object

Create a default client connection to the server.

See Also:

  • connect


122
123
124
125
126
127
128
129
130
# File 'lib/nats/client.rb', line 122

def start(*args, &blk)
  @reactor_was_running = EM.reactor_running?
  unless (@reactor_was_running || blk)
    raise(Error, "EM needs to be running when NATS.start called without a run block")
  end
  # Setup optimized select versions
  EM.epoll; EM.kqueue
  EM.run { @client = connect(*args, &blk) }
end

.stop(&blk) ⇒ Object

Close the default client connection and optionally call the associated block.

Parameters:

  • &blk (Block)

    called when the connection is closed.



134
135
136
137
138
139
# File 'lib/nats/client.rb', line 134

def stop(&blk)
  client.close if (client and (client.connected? || client.reconnecting?))
  blk.call if blk
  @@tried_autostart = {}
  @err_cb = nil
end

.subscribe(*args, &blk) ⇒ Object

Subscribe using the default client connection.

See Also:



185
186
187
# File 'lib/nats/client.rb', line 185

def subscribe(*args, &blk)
  (@client ||= connect).subscribe(*args, &blk)
end

.timeout(*args, &blk) ⇒ Object

Set a timeout for receiving messages for the subscription.

See Also:



197
198
199
# File 'lib/nats/client.rb', line 197

def timeout(*args, &blk)
  (@client ||= connect).timeout(*args, &blk)
end

.unsubscribe(*args) ⇒ Object

Cancel a subscription on the default client connection.

See Also:



191
192
193
# File 'lib/nats/client.rb', line 191

def unsubscribe(*args)
  (@client ||= connect).unsubscribe(*args)
end

.wait_for_server(uri, max_wait = 5) ⇒ Object

:nodoc:



227
228
229
230
231
232
233
# File 'lib/nats/client.rb', line 227

def wait_for_server(uri, max_wait = 5) # :nodoc:
  start = Time.now
  while (Time.now - start < max_wait) # Wait max_wait seconds max
    break if server_running?(uri)
    sleep(0.1)
  end
end

Instance Method Details

#attempt_reconnectObject

:nodoc:



610
611
612
613
614
# File 'lib/nats/client.rb', line 610

def attempt_reconnect #:nodoc:
  process_disconnect and return if (@reconnect_attempts += 1) > @options[:max_reconnect_attempts]
  EM.reconnect(@uri.host, @uri.port, self)
  @reconnect_cb.call unless @reconnect_cb.nil?
end

#closeObject

Close the connection to the server.



419
420
421
422
423
424
# File 'lib/nats/client.rb', line 419

def close
  @closing = true
  EM.cancel_timer(@reconnect_timer) if @reconnect_timer
  close_connection_after_writing if connected?
  process_disconnect if reconnecting?
end

#connection_completedObject

:nodoc:



558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
# File 'lib/nats/client.rb', line 558

def connection_completed #:nodoc:
  @connected = true unless @ssl
  if reconnecting?
    EM.cancel_timer(@reconnect_timer)
    send_connect_command
    @subs.each_pair { |k, v| send_command("SUB #{v[:subject]} #{v[:queue]} #{k}#{CR_LF}") }
  end
  flush_pending unless @ssl
  unless user_err_cb? or reconnecting?
    @err_cb = proc { |e| raise e }
  end
  if (connect_cb and not reconnecting?)
    # We will round trip the server here to make sure all state from any pending commands
    # has been processed before calling the connect callback.
    queue_server_rt { connect_cb.call(self) }
  end
  @reconnecting = false
  @parse_state = AWAITING_CONTROL_LINE
end

#disconnect_error_stringObject



593
594
595
596
# File 'lib/nats/client.rb', line 593

def disconnect_error_string
  return "Client disconnected from server on #{@uri}." if @connected
  return "Could not connect to server on #{@uri}"
end

#flush(&blk) ⇒ Object

Flushes all messages and subscriptions for the connection. All messages and subscriptions have been processed by the server when the optional callback is called.



396
397
398
# File 'lib/nats/client.rb', line 396

def flush(&blk)
  queue_server_rt(&blk) if blk
end

#flush_pendingObject

:nodoc:



486
487
488
489
490
# File 'lib/nats/client.rb', line 486

def flush_pending #:nodoc:
  return unless @pending
  send_data(@pending.join)
  @pending, @pending_size = nil, 0
end

#initialize(options) ⇒ Object



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/nats/client.rb', line 283

def initialize(options)
  @uri = options[:uri]
  @uri.user = options[:user] if options[:user]
  @uri.password = options[:pass] if options[:pass]
  @ssl = options[:ssl] if options[:ssl]
  @options = options
  @ssid, @subs = 1, {}
  @err_cb = NATS.err_cb
  @reconnect_timer, @needed = nil, nil
  @reconnect_cb = NATS.reconnect_cb
  @connected, @closing, @reconnecting = false, false, false
  @msgs_received = @msgs_sent = @bytes_received = @bytes_sent = @pings = 0
  @pending_size = 0
  send_connect_command
end

#inspectObject

:nodoc:



629
630
631
# File 'lib/nats/client.rb', line 629

def inspect #:nodoc:
  "<nats client v#{NATS::VERSION}>"
end

#on_connect(&callback) ⇒ Object

Define a callback to be called when the client connection has been established.

Parameters:

  • callback (Block)


402
403
404
# File 'lib/nats/client.rb', line 402

def on_connect(&callback)
  @connect_cb = callback
end

#on_error(&callback) ⇒ Object

Define a callback to be called when errors occur on the client connection.

Parameters:

  • &blk (Block)

    called when the connection is closed.



408
409
410
# File 'lib/nats/client.rb', line 408

def on_error(&callback)
  @err_cb, @err_cb_overridden = callback, true
end

#on_msg(subject, sid, reply, msg) ⇒ Object

:nodoc:



451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
# File 'lib/nats/client.rb', line 451

def on_msg(subject, sid, reply, msg) #:nodoc:

  # Accounting - We should account for inbound even if they are not processed.
  @msgs_received += 1
  @bytes_received += msg.bytesize if msg

  return unless sub = @subs[sid]

  # Check for auto_unsubscribe
  sub[:received] += 1
  if sub[:max]
    # Client side support in case server did not receive unsubscribe
    return unsubscribe(sid) if (sub[:received] > sub[:max])
    # cleanup here if we have hit the max..
    @subs.delete(sid) if (sub[:received] == sub[:max])
  end

  return unsubscribe(sid) if (sub[:max] && (sub[:received] > sub[:max]))

  if cb = sub[:callback]
    case cb.arity
      when 0 then cb.call
      when 1 then cb.call(msg)
      when 2 then cb.call(msg, reply)
      else cb.call(msg, reply, subject)
    end
  end

  # Check for a timeout, and cancel if received >= expected
  if (sub[:timeout] && sub[:received] >= sub[:expected])
    EM.cancel_timer(sub[:timeout])
    sub[:timeout] = nil
  end
end

#on_reconnect(&callback) ⇒ Object

Define a callback to be called when a reconnect attempt is made.

Parameters:

  • &blk (Block)

    called when the connection is closed.



414
415
416
# File 'lib/nats/client.rb', line 414

def on_reconnect(&callback)
  @reconnect_cb = callback
end

#pending_data_sizeObject

Return bytes outstanding waiting to be sent to server.



427
428
429
# File 'lib/nats/client.rb', line 427

def pending_data_size
  get_outbound_data_size + @pending_size
end

#process_disconnectObject

:nodoc:



598
599
600
601
602
603
604
605
606
607
608
# File 'lib/nats/client.rb', line 598

def process_disconnect #:nodoc:
  err_cb.call(NATS::ConnectError.new(disconnect_error_string)) if not closing? and @err_cb
ensure
  EM.cancel_timer(@reconnect_timer) if @reconnect_timer
  if (NATS.client == self)
    NATS.clear_client
    EM.stop if ((connected? || reconnecting?) and closing? and not NATS.reactor_was_running?)
  end
  @connected = @reconnecting = false
  true # Chaining
end

#process_info(info) ⇒ Object

:nodoc:



539
540
541
542
543
544
545
546
547
548
549
550
551
# File 'lib/nats/client.rb', line 539

def process_info(info) #:nodoc:
  @server_info = JSON.parse(info, :symbolize_keys => true, :symbolize_names => true)
  if @server_info[:ssl_required] && @ssl
    start_tls
  else
    if @server_info[:ssl_required]
      err_cb.call(NATS::ClientError.new('TLS/SSL required by server'))
    elsif @ssl
      err_cb.call(NATS::ClientError.new('TLS/SSL not supported by server'))
    end
  end
  @server_info
end

#publish(subject, msg = EMPTY_MSG, opt_reply = nil, &blk) ⇒ Object

Publish a message to a given subject, with optional reply subject and completion block

Parameters:

  • subject (String)
  • msg (Object, #to_s) (defaults to: EMPTY_MSG)
  • opt_reply (String) (defaults to: nil)
  • blk, (Block)

    closure called when publish has been processed by the server.



304
305
306
307
308
309
310
311
312
313
314
# File 'lib/nats/client.rb', line 304

def publish(subject, msg=EMPTY_MSG, opt_reply=nil, &blk)
  return unless subject
  msg = msg.to_s

  # Accounting
  @msgs_sent += 1
  @bytes_sent += msg.bytesize if msg

  send_command("PUB #{subject} #{opt_reply} #{msg.bytesize}#{CR_LF}#{msg}#{CR_LF}")
  queue_server_rt(&blk) if blk
end

#queue_server_rt(&cb) ⇒ Object

:nodoc:



445
446
447
448
449
# File 'lib/nats/client.rb', line 445

def queue_server_rt(&cb) #:nodoc:
  return unless cb
  (@pongs ||= []) << cb
  send_command(PING_REQUEST)
end

#receive_data(data) ⇒ Object

:nodoc:



492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
# File 'lib/nats/client.rb', line 492

def receive_data(data) #:nodoc:
  @buf = @buf ? @buf << data : data
  while (@buf)
    case @parse_state
    when AWAITING_CONTROL_LINE
      case @buf
      when MSG
        @buf = $'
        @sub, @sid, @reply, @needed = $1, $2.to_i, $4, $5.to_i
        @parse_state = AWAITING_MSG_PAYLOAD
      when OK # No-op right now
        @buf = $'
      when ERR
        @buf = $'
        err_cb.call(NATS::ServerError.new($1))
      when PING
        @pings += 1
        @buf = $'
        send_command(PONG_RESPONSE)
      when PONG
        @buf = $'
        cb = @pongs.shift
        cb.call if cb
      when INFO
        @buf = $'
        process_info($1)
      when UNKNOWN
        @buf = $'
        err_cb.call(NATS::ServerError.new("Unknown protocol: #{$1}"))
      else
        # If we are here we do not have a complete line yet that we understand.
        return
      end
      @buf = nil if (@buf && @buf.empty?)

    when AWAITING_MSG_PAYLOAD
      return unless (@needed && @buf.bytesize >= (@needed + CR_LF_SIZE))
      on_msg(@sub, @sid, @reply, @buf.slice(0, @needed))
      @buf = @buf.slice((@needed + CR_LF_SIZE), @buf.bytesize)
      @sub = @sid = @reply = @needed = nil
      @parse_state = AWAITING_CONTROL_LINE
      @buf = nil if (@buf && @buf.empty?)
    end

  end
end

#request(subject, data = nil, opts = {}, &cb) ⇒ Object

Send a request and have the response delivered to the supplied callback.

Parameters:

  • subject (String)
  • msg (Object)
  • callback (Block)

Returns:

  • (Object)

    sid



379
380
381
382
383
384
385
386
387
388
389
390
391
# File 'lib/nats/client.rb', line 379

def request(subject, data=nil, opts={}, &cb)
  return unless subject
  inbox = NATS.create_inbox
  s = subscribe(inbox, opts) { |msg, reply|
    case cb.arity
      when 0 then cb.call
      when 1 then cb.call(msg)
      else cb.call(msg, reply)
    end
  }
  publish(subject, data, inbox)
  return s
end

#schedule_reconnect(wait = RECONNECT_TIME_WAIT) ⇒ Object

:nodoc:



578
579
580
581
582
583
# File 'lib/nats/client.rb', line 578

def schedule_reconnect(wait=RECONNECT_TIME_WAIT) #:nodoc:
  @reconnecting = true
  @reconnect_attempts = 0
  @connected = false
  @reconnect_timer = EM.add_periodic_timer(wait) { attempt_reconnect }
end

#send_command(command, priority = false) ⇒ Object

:nodoc:



616
617
618
619
620
621
622
623
624
625
626
627
# File 'lib/nats/client.rb', line 616

def send_command(command, priority = false) #:nodoc:
  EM.next_tick { flush_pending } if (connected? && @pending.nil?)
  @pending ||= []
  @pending << command unless priority
  @pending.unshift(command) if priority
  @pending_size += command.bytesize
  flush_pending if (connected? && @pending_size > MAX_PENDING_SIZE)
  if (@options[:fast_producer_error] && pending_data_size > FAST_PRODUCER_THRESHOLD)
    err_cb.call(NATS::ClientError.new("Fast Producer: #{pending_data_size} bytes outstanding"))
  end
  true
end

#send_connect_commandObject

:nodoc:



435
436
437
438
439
440
441
442
443
# File 'lib/nats/client.rb', line 435

def send_connect_command #:nodoc:
  cs = { :verbose => @options[:verbose], :pedantic => @options[:pedantic] }
  if @uri.user
    cs[:user] = @uri.user
    cs[:pass] = @uri.password
  end
  cs[:ssl_required] = @ssl if @ssl
  send_command("CONNECT #{cs.to_json}#{CR_LF}", true)
end

#ssl_handshake_completedObject



553
554
555
556
# File 'lib/nats/client.rb', line 553

def ssl_handshake_completed
  @connected = true
  flush_pending
end

#subscribe(subject, opts = {}, &callback) ⇒ Object

Subscribe to a subject with optional wildcards. Messages will be delivered to the supplied callback. Callback can take any number of the supplied arguments as defined by the list: msg, reply, sub. Returns subscription id which can be passed to #unsubscribe.

Parameters:

  • subject, (String)

    optionally with wilcards.

  • opts, (Hash)

    optional options hash, e.g. :queue, :max.

  • callback, (Block)

    called when a message is delivered.

Returns:

  • (Object)

    sid, Subject Identifier



324
325
326
327
328
329
330
331
332
333
334
# File 'lib/nats/client.rb', line 324

def subscribe(subject, opts={}, &callback)
  return unless subject
  sid = (@ssid += 1)
  sub = @subs[sid] = { :subject => subject, :callback => callback, :received => 0 }
  sub[:queue] = opts[:queue] if opts[:queue]
  sub[:max] = opts[:max] if opts[:max]
  send_command("SUB #{subject} #{opts[:queue]} #{sid}#{CR_LF}")
  # Setup server support for auto-unsubscribe
  unsubscribe(sid, opts[:max]) if opts[:max]
  sid
end

#subscription_countNumber

Return the active subscription count.

Returns:

  • (Number)


349
350
351
# File 'lib/nats/client.rb', line 349

def subscription_count
  @subs.size
end

#timeout(sid, timeout, opts = {}, &callback) ⇒ Object

Setup a timeout for receiving messages for the subscription.

Parameters:

  • sid (Object)
  • timeout, (Number)

    float in seconds

  • opts, (Hash)

    options, :auto_unsubscribe(true), :expected(1)



357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
# File 'lib/nats/client.rb', line 357

def timeout(sid, timeout, opts={}, &callback)
  # Setup a timeout if requested
  return unless sub = @subs[sid]

  auto_unsubscribe, expected = true, 1
  auto_unsubscribe = opts[:auto_unsubscribe] if opts.key?(:auto_unsubscribe)
  expected = opts[:expected] if opts.key?(:expected)

  EM.cancel_timer(sub[:timeout]) if sub[:timeout]

  sub[:timeout] = EM.add_timer(timeout) do
    unsubscribe(sid) if auto_unsubscribe
    callback.call(sid) if callback
  end
  sub[:expected] = expected
end

#unbindObject

:nodoc:



585
586
587
588
589
590
591
# File 'lib/nats/client.rb', line 585

def unbind #:nodoc:
  if connected? and not closing? and not reconnecting? and @options[:reconnect]
    schedule_reconnect(@options[:reconnect_time_wait])
  else
    process_disconnect unless reconnecting?
  end
end

#unsubscribe(sid, opt_max = nil) ⇒ Object

Cancel a subscription.

Parameters:

  • sid (Object)
  • opt_max, (Number)

    optional number of responses to receive before auto-unsubscribing



339
340
341
342
343
344
345
# File 'lib/nats/client.rb', line 339

def unsubscribe(sid, opt_max=nil)
  opt_max_str = " #{opt_max}" unless opt_max.nil?
  send_command("UNSUB #{sid}#{opt_max_str}#{CR_LF}")
  return unless sub = @subs[sid]
  sub[:max] = opt_max
  @subs.delete(sid) unless (sub[:max] && (sub[:received] < sub[:max]))
end

#user_err_cb?Boolean

:nodoc:

Returns:

  • (Boolean)


431
432
433
# File 'lib/nats/client.rb', line 431

def user_err_cb? # :nodoc:
  err_cb_overridden || NATS.err_cb_overridden
end