3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
# File 'lib/bunnish/command/subscribe.rb', line 3
def self.run(argv, input_stream=$stdin, output_stream=$stdout)
params = Bunnish.parse_opts(argv)
host = params[:host]
port = params[:port]
user = params[:user]
password = params[:password]
durable = params[:durable]
unit_size = params[:unit_size] || 10000
weight_second = params[:weight_second]
retry_max_count = params[:retry_max_count]
raise_exception_flag = params[:raise_exception_flag]
ack = params[:ack]
consumer_tag = params[:consumer_tag]
exclusive = params[:exclusive]
message_max = params[:message_max]
timeout = params[:timeout]
current_all_flag = params[:current_all_flag]
min_size = params[:min_size]
log_label = params[:log_label]
log_dir = params[:log_dir]
log_path = params[:log_path]
queue_name = argv.shift
if queue_name.nil?
Bunnish.logger.error("queue-name is not set")
return 1
end
log_stream = nil
log_path = "#{log_dir}/#{queue_name.gsub(/[\/]/, "_")}.log" if log_dir
if log_path
log_stream = open(log_path, "a")
Bunnish.logger.info "#{log_label} output log into #{log_path}"
end
exchange_name = queue_name
bunny = Bunny.new(:logging => false, :spec => '09', :host=>host, :port=>port, :user=>user, :pass=>password)
bunny.start
bunny.qos(:prefetch_count => 1)
queue = bunny.queue(queue_name, :durable=>durable)
remain_count = queue.status[:message_count]
consumer_count = queue.status[:consumer_count]
message_max = 'current-size' if current_all_flag
if message_max == 'current-size' then
message_max = remain_count
elsif min_size
message_max = [remain_count - min_size, 0].max
else
message_max = message_max.to_i if message_max
end
if message_max
Bunnish::Core::Common.output_log [log_stream], "INFO", "#{log_label} subscribe #{message_max} messages from #{queue_name}(#{remain_count} messages, #{consumer_count} consumers)"
if message_max <= 0
Bunnish::Core::Common.output_log [log_stream], "INFO", "#{log_label} finished"
bunny.stop
return 0
end
else
Bunnish::Core::Common.output_log [log_stream], "INFO", "#{log_label} subscribe to #{queue_name}(#{remain_count} messages, #{consumer_count} consumers)"
end
if !exchange_name.nil? && exchange_name != '' then
exchange = bunny.exchange(exchange_name)
queue.bind(exchange)
end
total_count = 0
count = 0
subscribe_flag = false
if remain_count == 0 then
Bunnish::Core::Common.output_log [log_stream], "INFO", "#{log_label} no messages in #{queue_name}(#{remain_count} messages, #{consumer_count} consumers)"
else
retry_count = 0
begin
queue.subscribe(:ack=>ack, \
:consumer_tag=>consumer_tag, \
:exclusive=>exclusive, \
:message_max=>message_max, \
:timeout=>timeout) do |msg|
if msg && msg[:payload] then
output_stream.puts msg[:payload]
count += 1
total_count += 1
if unit_size <= count then
subscribe_flag = true
Bunnish::Core::Common.output_log [log_stream], "INFO", "#{log_label} subscribed #{count} messages from #{queue_name}"
count = 0
break if min_size && remain_count <= total_count + min_size
end
end
retry_count = 0
end
rescue Exception=>e
if retry_count < retry_max_count
Bunnish.logger.warn("(EXCEPTION)#{log_label} #{e.message}(#{e.class.name}): #{e.backtrace.map{|s| " #{s}"}.join("\n")}")
Bunnish.logger.warn("#{log_label} retry(#{retry_count})")
retry_count += 1
sleep(weight_second)
retry
else
if raise_exception_flag then
bunny.stop if bunny
raise e if raise_exception_flag
else
Bunnish.logger.warn("(EXCEPTION)#{log_label} #{e.message}(#{e.class.name}): #{e.backtrace.map{|s| " #{s}"}.join("\n")}")
end
end
end
end
subscribe_flag = true if 0 < count
Bunnish::Core::Subscribe.output_subscribe_log [log_stream], queue, count, log_label
if log_stream then
log_stream.close
end
bunny.stop
return 0
end
|