3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
# File 'lib/bunnish/command/publish.rb', line 3
def self.run(argv, input_stream=$stdin, output_stream=$stdout)
params = Bunnish.parse_opts(argv)
host = params[:host]
port = params[:port]
user = params[:user]
password = params[:password]
durable = params[:durable]
exchange_name = params[:exchange_name]
unit_size = params[:unit_size] || 100000
raise_exception_flag = params[:raise_exception_flag]
delimiter = params[:delimiter]
log_label = params[:log_label]
log_dir = params[:log_dir]
log_path = params[:log_path]
queue_name_list = argv.shift
if queue_name_list.nil?
Bunnish.logger.error("queue-name is not set")
return 1
end
queue_name_list = queue_name_list.split(/[, \r\n]/)
queue_name_list.delete('')
if delimiter
delimiter_crlf = "#{delimiter}\r\n"
delimiter_lf = "#{delimiter}\n"
end
log_streams = {}
queue_name_list.each do |queue_name|
log_path = "#{log_dir}/#{queue_name.gsub(/[\/]/, "_")}.log" if log_dir
if log_path then
log_stream = open(log_path, "a")
log_streams[queue_name] = log_stream
Bunnish.logger.info "#{log_label} output log into #{log_path}"
end
end
fanout_flag = (exchange_name && exchange_name != '' && 1 < queue_name_list.size)
bunny = nil
publish_flag = false
exchange_list = []
begin
count = 0
lines = []
while line = input_stream.gets do
if bunny == nil then
bunny = Bunny.new(:logging => false, :spec => '09', :host=>host, :port=>port, :user=>user, :pass=>password)
bunny.start
if fanout_flag then
fanout_exchange = Bunnish::Core::Publish.create_fanout_exchange(bunny, queue_name_list, log_streams, params)
exchange_list.push fanout_exchange
else
direct_exchange_list = queue_name_list.map {|queue_name|
Bunnish::Core::Publish.create_direct_exchange(bunny, queue_name, log_streams, params)
}
exchange_list = direct_exchange_list
end
queue_name_list.each do |queue_name|
queue = bunny.queue(queue_name, :durable=>durable)
message = "#{log_label} publish to #{queue_name}(#{queue.status[:message_count]} messages, #{queue.status[:consumer_count]} consumers)"
Bunnish::Core::Common.output_log [log_streams[queue_name]], "INFO", message
end
end
if delimiter
lines.push line
next if /^#{Regexp.escape(delimiter)}\r?\n$/ !~ line
message = lines.join
lines.clear
else
message = line
end
exchange_list.each do |exchange|
exchange.publish(message)
end
count += 1
if unit_size <= count then
publish_flag = true
queue_name_list.each do |queue_name|
queue = bunny.queue(queue_name, :durable=>durable)
log_stream = log_streams[queue_name]
Bunnish::Core::Publish.output_publish_log [log_stream], queue, count, log_label
end
count = 0
end
end
publish_flag = true if 0 < count
queue_name_list.each do |queue_name|
log_stream = log_streams[queue_name]
if bunny then
queue = bunny.queue(queue_name, :durable=>durable)
Bunnish::Core::Publish.output_publish_log [log_stream], queue, count, log_label if 0 < count || !publish_flag
else
Bunnish::Core::Common.output_log [log_stream], "INFO", "#{log_label} no input for #{queue_name}"
end
end
bunny.stop if bunny
rescue Exception=>e
if raise_exception_flag then
bunny.stop if bunny
raise e if raise_exception_flag
else
message = "#{log_label} #{e.message}(#{e.class.name}): #{e.backtrace.map{|s| " #{s}"}.join("\n")}"
Bunnish::Core::Common.output_log(log_streams.values, "EXCEPTION", message)
end
end
log_streams.values.each do |log_stream|
log_stream.close
end
return 0
end
|