Class: Nsq::Connection

Inherits:
Object
  • Object
show all
Includes:
AttributeLogger
Defined in:
lib/nsq/connection.rb

Constant Summary collapse

USER_AGENT =
"nsq-ruby/#{Nsq::Version::STRING}"
RESPONSE_HEARTBEAT =
'_heartbeat_'
RESPONSE_OK =
'OK'
@@log_attributes =
[:host, :port]

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from AttributeLogger

included

Constructor Details

#initialize(opts = {}) ⇒ Connection

Returns a new instance of Connection.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/nsq/connection.rb', line 26

def initialize(opts = {})
  @host = opts[:host] || (raise ArgumentError, 'host is required')
  @port = opts[:port] || (raise ArgumentError, 'port is required')
  @queue = opts[:queue]
  @topic = opts[:topic]
  @channel = opts[:channel]
  @msg_timeout = opts[:msg_timeout] || 60_000 # 60s
  @max_in_flight = opts[:max_in_flight] || 1
  @ssl_context = opts[:ssl_context]
  validate_ssl_context! if @ssl_context

  if @msg_timeout < 1000
    raise ArgumentError, 'msg_timeout cannot be less than 1000. it\'s in milliseconds.'
  end

  # for outgoing communication
  @write_queue = Queue.new

  # For indicating that the connection has died.
  # We use a Queue so we don't have to poll. Used to communicate across
  # threads (from write_loop and read_loop to connect_and_monitor).
  @death_queue = Queue.new

  @connected = false
  @presumed_in_flight = 0

  open_connection
  start_monitoring_connection
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



16
17
18
# File 'lib/nsq/connection.rb', line 16

def host
  @host
end

#max_in_flightObject

Returns the value of attribute max_in_flight.



18
19
20
# File 'lib/nsq/connection.rb', line 18

def max_in_flight
  @max_in_flight
end

#portObject (readonly)

Returns the value of attribute port.



17
18
19
# File 'lib/nsq/connection.rb', line 17

def port
  @port
end

#presumed_in_flightObject (readonly)

Returns the value of attribute presumed_in_flight.



19
20
21
# File 'lib/nsq/connection.rb', line 19

def presumed_in_flight
  @presumed_in_flight
end

Instance Method Details

#closeObject

close the connection and don’t try to re-open it



63
64
65
66
# File 'lib/nsq/connection.rb', line 63

def close
  stop_monitoring_connection
  close_connection
end

#connected?Boolean

Returns:

  • (Boolean)


57
58
59
# File 'lib/nsq/connection.rb', line 57

def connected?
  @connected
end

#dpub(topic, delay_in_ms, message) ⇒ Object



100
101
102
# File 'lib/nsq/connection.rb', line 100

def dpub(topic, delay_in_ms, message)
  write ["DPUB #{topic} #{delay_in_ms}\n", message.bytesize, message].pack('a*l>a*')
end

#fin(message_id) ⇒ Object



79
80
81
82
# File 'lib/nsq/connection.rb', line 79

def fin(message_id)
  write "FIN #{message_id}\n"
  decrement_in_flight
end

#mpub(topic, messages) ⇒ Object



104
105
106
107
108
109
110
# File 'lib/nsq/connection.rb', line 104

def mpub(topic, messages)
  body = messages.map do |message|
    [message.bytesize, message].pack('l>a*')
  end.join

  write ["MPUB #{topic}\n", body.bytesize, messages.size, body].pack('a*l>l>a*')
end

#pub(topic, message) ⇒ Object



96
97
98
# File 'lib/nsq/connection.rb', line 96

def pub(topic, message)
  write ["PUB #{topic}\n", message.bytesize, message].pack('a*l>a*')
end

#rdy(count) ⇒ Object



74
75
76
# File 'lib/nsq/connection.rb', line 74

def rdy(count)
  write "RDY #{count}\n"
end

#re_up_readyObject

Tell the server we are ready for more messages!



114
115
116
117
118
119
120
# File 'lib/nsq/connection.rb', line 114

def re_up_ready
  rdy(@max_in_flight)
  # assume these messages are coming our way. yes, this might not be the
  # case, but it's much easier to manage our RDY state with the server if
  # we treat things this way.
  @presumed_in_flight = @max_in_flight
end

#req(message_id, timeout) ⇒ Object



85
86
87
88
# File 'lib/nsq/connection.rb', line 85

def req(message_id, timeout)
  write "REQ #{message_id} #{timeout}\n"
  decrement_in_flight
end

#sub(topic, channel) ⇒ Object



69
70
71
# File 'lib/nsq/connection.rb', line 69

def sub(topic, channel)
  write "SUB #{topic} #{channel}\n"
end

#touch(message_id) ⇒ Object



91
92
93
# File 'lib/nsq/connection.rb', line 91

def touch(message_id)
  write "TOUCH #{message_id}\n"
end