Class: Jp::AbstractConsumer

Inherits:
AbstractClient show all
Defined in:
lib/rb/jp/consumer.rb

Direct Known Subclasses

JsonConsumer, TextConsumer, ThriftConsumer

Instance Method Summary collapse

Constructor Details

#initialize(queue, options = {}, &block) ⇒ AbstractConsumer

Returns a new instance of AbstractConsumer.



11
12
13
14
# File 'lib/rb/jp/consumer.rb', line 11

def initialize queue, options = {}, &block
	super queue, options
	@block = block
end

Instance Method Details

#pollObject



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/rb/jp/consumer.rb', line 32

def poll
	i = 0
	begin
		loop do
			consume
			i += 1
		end
	rescue EmptyPool
		return i
	end
end

#runObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/rb/jp/consumer.rb', line 16

def run
	worker = Rev::TimerWatcher.new @options[:poll_interval], true
	def worker.block= block
		@block = block
	end
	def worker.on_timer
		@block.call
	end
	worker.block = lambda do
		poll
	end
	rev_loop = Rev::Loop.new
	worker.attach rev_loop
	rev_loop.run
end