Module: Viki::Queue::Runner

Included in:
Service
Defined in:
lib/viki/queue/runner.rb

Instance Method Summary collapse

Instance Method Details

#run(queue, router, config = {}) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/viki/queue/runner.rb', line 5

def run(queue, router, config={})
  config = {iterations: 1, fail_pause: 10}.merge(config)

  begin
    EventMachine.run do
      connection = AMQP.connect({
        host: Viki::Queue.host,
        port: Viki::Queue.port,
        username: Viki::Queue.username,
        password: Viki::Queue.password})
      channel = AMQP::Channel.new(connection)
      loops = 0
      channel.prefetch(1).queue(queue, :durable => true).subscribe(:ack => true) do |, message|
        processed = false
        for i in 1..10 do
          begin
            payload = Oj.load(message, symbol_keys: true)
            if payload[:_meta]
              payload[:_meta][:timestamp] = .timestamp
            else
              payload[:_meta] = {timestamp: .timestamp}
            end

            if process(router, payload) == true
              processed = true
              .ack
              break
            end
          rescue => e
            router.error(e)
          end
          sleep(config[:fail_pause])
        end

        unless processed
          router.error("Failed to process message: #{message}")
          connection.close { EventMachine.stop }
        end

        loops += 1
        if loops == config[:iterations]
          connection.close { EventMachine.stop }
        end
      end
    end
  rescue Interrupt
    puts "Queue is interrupt. Good night!"
  end
end