5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
# File 'lib/viki/queue/runner.rb', line 5
def run(queue, router, config={})
config = {iterations: 1, fail_pause: 10}.merge(config)
begin
EventMachine.run do
connection = AMQP.connect({
host: Viki::Queue.host,
port: Viki::Queue.port,
username: Viki::Queue.username,
password: Viki::Queue.password})
channel = AMQP::Channel.new(connection)
loops = 0
channel.prefetch(1).queue(queue, :durable => true).subscribe(:ack => true) do |metadata, message|
processed = false
for i in 1..10 do
begin
payload = Oj.load(message, symbol_keys: true)
if payload[:_meta]
payload[:_meta][:timestamp] = metadata.timestamp
else
payload[:_meta] = {timestamp: metadata.timestamp}
end
if process(router, payload) == true
processed = true
metadata.ack
break
end
rescue => e
router.error(e)
end
sleep(config[:fail_pause])
end
unless processed
router.error("Failed to process message: #{message}")
connection.close { EventMachine.stop }
end
loops += 1
if loops == config[:iterations]
connection.close { EventMachine.stop }
end
end
end
rescue Interrupt
puts "Queue is interrupt. Good night!"
end
end
|