25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/amqpop/cli.rb', line 25
def start
begin
EventMachine.threadpool_size = options[:num_children]
EventMachine.run do
Signal.trap("INT") { shutdown }
Signal.trap("TERM") { shutdown }
vputs "Running #{AMQP::VERSION} version of the AMQP gem."
vputs "Connecting to AMQP broker on #{connection_params[:host]} as #{connection_params[:username]}."
AMQP.connect(connection_params) do |connection|
Amqpop.connection = connection
connection.on_tcp_connection_loss do |cl, settings|
eputs "Connection to AMQP broker lost!"
eputs "Waiting 2 seconds to attempt to connect..."
cl.reconnect(true, 2)
end
connection.on_open do
vputs "Connection established"
end
connection.on_recovery do
eputs "Connection reestablished"
end
AMQP::Channel.new(connection, :auto_recovery => true) do |channel|
channel.on_error do |ch, close|
eputs "ERROR: Channel-level exception: #{close.reply_text}, #{close.inspect}"
connection.close {
shutdown
}
end
if options[:wait] == 0
vputs "No timeout set, process will stay running"
else
vputs "Timeout of #{options[:wait]} seconds set"
end
queue = get_queue(channel)
queue.once_declared do
vputs "Connected to queue: #{queue.name}"
vputs "Ack mode: #{Amqpop.require_ack? ? 'explicit' : 'auto'}"
end
bind_queue(queue)
queue.subscribe(:confirm => proc{ wait_exit_timer }, :ack => Amqpop.require_ack?) do |meta, payload|
cancel_wait_exit_timer
m = Message.new(payload, meta)
vputs "Received a message: #{payload}. Executing..."
EventMachine.defer(m.command_proc, m.callback_proc)
wait_exit_timer
end
end
end
end
rescue => e
eputs "ERROR: #{e.class} - #{e.message}"
exit 2
end
end
|