Class: Hivent::Redis::Consumer

Inherits:
Object
  • Object
show all
Includes:
Extensions
Defined in:
lib/hivent/redis/consumer.rb

Constant Summary collapse

LUA_CONSUMER =
File.expand_path("../lua/consumer.lua", __FILE__)
LUA_HEARTBEAT =
File.expand_path("../lua/heartbeat.lua", __FILE__)
SLEEP_TIME =

In milliseconds

200
CONSUMER_TTL =
1000

Constants included from Extensions

Extensions::LUA_CACHE

Instance Method Summary collapse

Methods included from Extensions

#script

Constructor Details

#initialize(redis, service_name, name, life_cycle_event_handler) ⇒ Consumer

Returns a new instance of Consumer.



16
17
18
19
20
21
22
# File 'lib/hivent/redis/consumer.rb', line 16

def initialize(redis, service_name, name, life_cycle_event_handler)
  @redis                    = redis
  @service_name             = service_name
  @name                     = name
  @stop                     = false
  @life_cycle_event_handler = life_cycle_event_handler
end

Instance Method Details

#consumeObject



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/hivent/redis/consumer.rb', line 38

def consume
  to_process = items

  to_process.each do |(queue, item)|
    payload = nil
    begin
      payload = JSON.parse(item).with_indifferent_access

      Hivent.emitter.broadcast(payload)

      @life_cycle_event_handler.event_processing_succeeded(event_name(payload), event_version(payload), payload)
    rescue => e
      @redis.lpush(dead_letter_queue_name(queue), item)

      @life_cycle_event_handler.event_processing_failed(e, payload, item, dead_letter_queue_name(queue))
    ensure
      @redis.rpop(queue)
    end
  end

  Kernel.sleep(SLEEP_TIME.to_f / 1000) if to_process.empty?
end

#queuesObject



34
35
36
# File 'lib/hivent/redis/consumer.rb', line 34

def queues
  script(LUA_CONSUMER, @service_name, @name, CONSUMER_TTL) || []
end

#run!Object



24
25
26
27
# File 'lib/hivent/redis/consumer.rb', line 24

def run!
  start_heartbeat!
  consume while !@stop
end

#stop!Object



29
30
31
32
# File 'lib/hivent/redis/consumer.rb', line 29

def stop!
  @stop = true
  stop_heartbeat!
end