Class: Nsq::Connection
- Inherits:
-
Object
- Object
- Nsq::Connection
- Includes:
- AttributeLogger
- Defined in:
- lib/nsq/connection.rb
Constant Summary collapse
- USER_AGENT =
"nsq-ruby/#{Nsq::Version::STRING}"- RESPONSE_HEARTBEAT =
'_heartbeat_'- RESPONSE_OK =
'OK'- @@log_attributes =
[:host, :port]
Instance Attribute Summary collapse
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#max_in_flight ⇒ Object
Returns the value of attribute max_in_flight.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#presumed_in_flight ⇒ Object
readonly
Returns the value of attribute presumed_in_flight.
Instance Method Summary collapse
-
#close ⇒ Object
close the connection and don’t try to re-open it.
- #connected? ⇒ Boolean
- #dpub(topic, delay_in_ms, message) ⇒ Object
- #fin(message_id) ⇒ Object
-
#initialize(opts = {}) ⇒ Connection
constructor
A new instance of Connection.
- #mpub(topic, messages) ⇒ Object
- #pub(topic, message) ⇒ Object
- #rdy(count) ⇒ Object
-
#re_up_ready ⇒ Object
Tell the server we are ready for more messages!.
- #req(message_id, timeout) ⇒ Object
- #sub(topic, channel) ⇒ Object
- #touch(message_id) ⇒ Object
Methods included from AttributeLogger
Constructor Details
#initialize(opts = {}) ⇒ Connection
Returns a new instance of Connection.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/nsq/connection.rb', line 26 def initialize(opts = {}) @host = opts[:host] || (raise ArgumentError, 'host is required') @port = opts[:port] || (raise ArgumentError, 'port is required') @queue = opts[:queue] @topic = opts[:topic] @channel = opts[:channel] @msg_timeout = opts[:msg_timeout] || 60_000 # 60s @max_in_flight = opts[:max_in_flight] || 1 @ssl_context = opts[:ssl_context] validate_ssl_context! if @ssl_context if @msg_timeout < 1000 raise ArgumentError, 'msg_timeout cannot be less than 1000. it\'s in milliseconds.' end # for outgoing communication @write_queue = Queue.new # For indicating that the connection has died. # We use a Queue so we don't have to poll. Used to communicate across # threads (from write_loop and read_loop to connect_and_monitor). @death_queue = Queue.new @connected = false @presumed_in_flight = 0 open_connection start_monitoring_connection end |
Instance Attribute Details
#host ⇒ Object (readonly)
Returns the value of attribute host.
16 17 18 |
# File 'lib/nsq/connection.rb', line 16 def host @host end |
#max_in_flight ⇒ Object
Returns the value of attribute max_in_flight.
18 19 20 |
# File 'lib/nsq/connection.rb', line 18 def max_in_flight @max_in_flight end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
17 18 19 |
# File 'lib/nsq/connection.rb', line 17 def port @port end |
#presumed_in_flight ⇒ Object (readonly)
Returns the value of attribute presumed_in_flight.
19 20 21 |
# File 'lib/nsq/connection.rb', line 19 def presumed_in_flight @presumed_in_flight end |
Instance Method Details
#close ⇒ Object
close the connection and don’t try to re-open it
63 64 65 66 |
# File 'lib/nsq/connection.rb', line 63 def close stop_monitoring_connection close_connection end |
#connected? ⇒ Boolean
57 58 59 |
# File 'lib/nsq/connection.rb', line 57 def connected? @connected end |
#dpub(topic, delay_in_ms, message) ⇒ Object
100 101 102 |
# File 'lib/nsq/connection.rb', line 100 def dpub(topic, delay_in_ms, ) write ["DPUB #{topic} #{delay_in_ms}\n", .bytesize, ].pack('a*l>a*') end |
#fin(message_id) ⇒ Object
79 80 81 82 |
# File 'lib/nsq/connection.rb', line 79 def fin() write "FIN #{message_id}\n" decrement_in_flight end |
#mpub(topic, messages) ⇒ Object
104 105 106 107 108 109 110 |
# File 'lib/nsq/connection.rb', line 104 def mpub(topic, ) body = .map do || [.bytesize, ].pack('l>a*') end.join write ["MPUB #{topic}\n", body.bytesize, .size, body].pack('a*l>l>a*') end |
#pub(topic, message) ⇒ Object
96 97 98 |
# File 'lib/nsq/connection.rb', line 96 def pub(topic, ) write ["PUB #{topic}\n", .bytesize, ].pack('a*l>a*') end |
#rdy(count) ⇒ Object
74 75 76 |
# File 'lib/nsq/connection.rb', line 74 def rdy(count) write "RDY #{count}\n" end |
#re_up_ready ⇒ Object
Tell the server we are ready for more messages!
114 115 116 117 118 119 120 |
# File 'lib/nsq/connection.rb', line 114 def re_up_ready rdy(@max_in_flight) # assume these messages are coming our way. yes, this might not be the # case, but it's much easier to manage our RDY state with the server if # we treat things this way. @presumed_in_flight = @max_in_flight end |
#req(message_id, timeout) ⇒ Object
85 86 87 88 |
# File 'lib/nsq/connection.rb', line 85 def req(, timeout) write "REQ #{message_id} #{timeout}\n" decrement_in_flight end |
#sub(topic, channel) ⇒ Object
69 70 71 |
# File 'lib/nsq/connection.rb', line 69 def sub(topic, channel) write "SUB #{topic} #{channel}\n" end |
#touch(message_id) ⇒ Object
91 92 93 |
# File 'lib/nsq/connection.rb', line 91 def touch() write "TOUCH #{message_id}\n" end |