Class: Jp::AbstractConsumer

Inherits:
AbstractClient show all
Defined in:
lib/rb/jp/consumer.rb

Direct Known Subclasses

JsonConsumer, TextConsumer, ThriftConsumer

Instance Method Summary collapse

Constructor Details

#initialize(queue, options = {}, &block) ⇒ AbstractConsumer

Returns a new instance of AbstractConsumer.



11
12
13
14
# File 'lib/rb/jp/consumer.rb', line 11

def initialize queue, options = {}, &block
  super queue, options
  @block = block
end

Instance Method Details

#pollObject



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/rb/jp/consumer.rb', line 32

def poll
  i = 0
  begin
    loop do
      consume
      i += 1
    end
  rescue EmptyPool
    return i
  end
end

#runObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/rb/jp/consumer.rb', line 16

def run
  worker = Rev::TimerWatcher.new @options[:poll_interval], true
  def worker.block= block
    @block = block
  end
  def worker.on_timer
    @block.call
  end
  worker.block = lambda do
    poll
  end
  rev_loop = Rev::Loop.new
  worker.attach rev_loop
  rev_loop.run
end