Class: Nsqrb::Consumer
- Inherits:
-
Object
- Object
- Nsqrb::Consumer
- Defined in:
- lib/nsqrb/consumer.rb
Constant Summary collapse
- TCP_BUFFER =
64.kilobytes
- PROTOCOL_VERSION =
"v2"
Instance Attribute Summary collapse
-
#errors ⇒ Object
readonly
Returns the value of attribute errors.
-
#messages ⇒ Object
readonly
Returns the value of attribute messages.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#responses ⇒ Object
readonly
Returns the value of attribute responses.
Instance Method Summary collapse
- #close! ⇒ Object
- #confirm(message) ⇒ Object
- #connect! ⇒ Object
-
#initialize(options = {}) ⇒ Consumer
constructor
A new instance of Consumer.
- #receive ⇒ Object
- #requeue(message, timeout = 0) ⇒ Object
- #touch(message) ⇒ Object
Constructor Details
Instance Attribute Details
#errors ⇒ Object (readonly)
Returns the value of attribute errors.
3 4 5 |
# File 'lib/nsqrb/consumer.rb', line 3 def errors @errors end |
#messages ⇒ Object (readonly)
Returns the value of attribute messages.
3 4 5 |
# File 'lib/nsqrb/consumer.rb', line 3 def @messages end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
3 4 5 |
# File 'lib/nsqrb/consumer.rb', line 3 def @options end |
#responses ⇒ Object (readonly)
Returns the value of attribute responses.
3 4 5 |
# File 'lib/nsqrb/consumer.rb', line 3 def responses @responses end |
Instance Method Details
#close! ⇒ Object
40 41 42 43 44 45 |
# File 'lib/nsqrb/consumer.rb', line 40 def close! return if @socket.nil? || @socket.closed? @socket.write Command::Cls.new.to_line @socket.close @socket = nil end |
#confirm(message) ⇒ Object
28 29 30 |
# File 'lib/nsqrb/consumer.rb', line 28 def confirm() @socket.write Command::Fin.new(message_id: .id).to_line end |
#connect! ⇒ Object
47 48 49 50 51 52 53 54 55 |
# File 'lib/nsqrb/consumer.rb', line 47 def connect! close! if @socket && !@socket.closed? @socket = TCPSocket.open([:host], [:port]) @socket.write PROTOCOL_VERSION.rjust(4).upcase @socket.write Command::Identify.new(identify_defaults.merge([:features] || {})).to_line @socket.write Command::Sub.new(topic_name: [:topic], channel_name: [:channel]).to_line @socket.write Command::Rdy.new(count: 1).to_line puts 'Ready to receive!' end |
#receive ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/nsqrb/consumer.rb', line 16 def receive begin buffer = @socket.recv(TCP_BUFFER) @parser.add(buffer) frames = @parser.parse frames.each { |frame| handle(frame) } rescue => e close! raise e end end |