Class: Hivent::Redis::Consumer
- Inherits:
-
Object
- Object
- Hivent::Redis::Consumer
- Includes:
- Extensions
- Defined in:
- lib/hivent/redis/consumer.rb
Constant Summary collapse
- LUA_CONSUMER =
File.("../lua/consumer.lua", __FILE__)
- LUA_HEARTBEAT =
File.("../lua/heartbeat.lua", __FILE__)
- SLEEP_TIME =
In milliseconds
200
- CONSUMER_TTL =
1000
Constants included from Extensions
Instance Method Summary collapse
- #consume ⇒ Object
-
#initialize(redis, service_name, name, life_cycle_event_handler) ⇒ Consumer
constructor
A new instance of Consumer.
- #queues ⇒ Object
- #run! ⇒ Object
- #stop! ⇒ Object
Methods included from Extensions
Constructor Details
#initialize(redis, service_name, name, life_cycle_event_handler) ⇒ Consumer
Returns a new instance of Consumer.
16 17 18 19 20 21 22 |
# File 'lib/hivent/redis/consumer.rb', line 16 def initialize(redis, service_name, name, life_cycle_event_handler) @redis = redis @service_name = service_name @name = name @stop = false @life_cycle_event_handler = life_cycle_event_handler end |
Instance Method Details
#consume ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/hivent/redis/consumer.rb', line 38 def consume to_process = items to_process.each do |(queue, item)| payload = nil begin payload = JSON.parse(item).with_indifferent_access Hivent.emitter.broadcast(payload) @life_cycle_event_handler.event_processing_succeeded(event_name(payload), event_version(payload), payload) rescue => e @redis.lpush(dead_letter_queue_name(queue), item) @life_cycle_event_handler.event_processing_failed(e, payload, item, dead_letter_queue_name(queue)) ensure @redis.rpop(queue) end end Kernel.sleep(SLEEP_TIME.to_f / 1000) if to_process.empty? end |
#queues ⇒ Object
34 35 36 |
# File 'lib/hivent/redis/consumer.rb', line 34 def queues script(LUA_CONSUMER, @service_name, @name, CONSUMER_TTL) || [] end |
#run! ⇒ Object
24 25 26 27 |
# File 'lib/hivent/redis/consumer.rb', line 24 def run! start_heartbeat! consume while !@stop end |
#stop! ⇒ Object
29 30 31 32 |
# File 'lib/hivent/redis/consumer.rb', line 29 def stop! @stop = true stop_heartbeat! end |