Class: Nsqrb::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/nsqrb/consumer.rb

Constant Summary collapse

TCP_BUFFER =
64.kilobytes
PROTOCOL_VERSION =
"v2"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Consumer

Returns a new instance of Consumer.



8
9
10
11
12
13
14
# File 'lib/nsqrb/consumer.rb', line 8

def initialize(options = {})
  @options = options
  @messages = []
  @errors = []
  @responses = []
  @parser = Parser.new
end

Instance Attribute Details

#errorsObject (readonly)

Returns the value of attribute errors.



3
4
5
# File 'lib/nsqrb/consumer.rb', line 3

def errors
  @errors
end

#messagesObject (readonly)

Returns the value of attribute messages.



3
4
5
# File 'lib/nsqrb/consumer.rb', line 3

def messages
  @messages
end

#optionsObject (readonly)

Returns the value of attribute options.



3
4
5
# File 'lib/nsqrb/consumer.rb', line 3

def options
  @options
end

#responsesObject (readonly)

Returns the value of attribute responses.



3
4
5
# File 'lib/nsqrb/consumer.rb', line 3

def responses
  @responses
end

Instance Method Details

#close!Object



40
41
42
43
44
45
# File 'lib/nsqrb/consumer.rb', line 40

def close!
  return if @socket.nil? || @socket.closed?
  @socket.write Command::Cls.new.to_line
  @socket.close
  @socket = nil
end

#confirm(message) ⇒ Object



28
29
30
# File 'lib/nsqrb/consumer.rb', line 28

def confirm(message)
  @socket.write Command::Fin.new(message_id: message.id).to_line
end

#connect!Object



47
48
49
50
51
52
53
54
55
# File 'lib/nsqrb/consumer.rb', line 47

def connect!
  close! if @socket && !@socket.closed?
  @socket = TCPSocket.open(options[:host], options[:port])
  @socket.write PROTOCOL_VERSION.rjust(4).upcase
  @socket.write Command::Identify.new(identify_defaults.merge(options[:features] || {})).to_line
  @socket.write Command::Sub.new(topic_name: options[:topic], channel_name: options[:channel]).to_line
  @socket.write Command::Rdy.new(count: 1).to_line
  puts 'Ready to receive!'
end

#receiveObject



16
17
18
19
20
21
22
23
24
25
26
# File 'lib/nsqrb/consumer.rb', line 16

def receive
  begin
    buffer = @socket.recv(TCP_BUFFER)
    @parser.add(buffer)
    frames = @parser.parse
    frames.each { |frame| handle(frame) }
  rescue => e
    close!
    raise e
  end
end

#requeue(message, timeout = 0) ⇒ Object



32
33
34
# File 'lib/nsqrb/consumer.rb', line 32

def requeue(message, timeout = 0)
  @socket.write Command::Req.new(message_id: message.id, timeout: timeout).to_line
end

#touch(message) ⇒ Object



36
37
38
# File 'lib/nsqrb/consumer.rb', line 36

def touch(message)
  @socket.write Command::Touch.new(message_id: message.id).to_line
end