5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
# File 'lib/logstash-cli/command/tail.rb', line 5
def _tail(options)
amqp_url = options[:url]
amqp_user = options[:user]
amqp_password = options[:password]
amqp_vhost = options[:vhost]
amqp_port = options[:port]
amqp_host = options[:host]
amqp_ssl = options[:ssl]
exchange_name = options[:exchange]
exchange_type = options[:exchange_type]
persistent = options[:persistent]
durable = options[:durable]
auto_delete = options[:autodelete]
routing_key = options[:key]
metafields = options[:meta].split(',')
begin
settings= { :host => amqp_host, :vhost => amqp_vhost, :port => amqp_port,
:user => amqp_user, :password => amqp_password ,
:ssl => amqp_ssl }
unless amqp_url.nil?
settings = amqp_url
end
AMQP.start(settings) do |connection, open_ok|
trap("INT") { puts "Shutting down..."; connection.close { EM.stop }; exit }
channel = AMQP::Channel.new(connection, :auto_recovery => true)
channel.queue("", :auto_delete => auto_delete, :persistent => persistent , :durable => durable) do |queue, declare_ok|
queue.bind(exchange_name, :routing_key => routing_key)
queue.subscribe do |payload|
parsed_message = JSON.parse(payload)
result = Array.new
metafields.each do |metafield|
result << parsed_message["@#{metafield}"]
end
puts _format(result, options)
result = []
end
end
end
rescue AMQP::PossibleAuthenticationFailureError => ex
puts "Possible Authentication error:\nthe AMQP connection URL used is #{amqp_url}\n\nDetail Info:\n#{ex}"
exit -1
rescue StandardError => ex
puts "Error occurred: #{ex}"
exit -1
end
end
|