Module: Emissary::Operator::AMQP
- Defined in:
- lib/emissary/operator/amqp.rb
Defined Under Namespace
Classes: InvalidConfig, InvalidExchange
Constant Summary
collapse
- REQUIRED_KEYS =
[ :uri, :subscriptions ]
- VALID_EXCHANGES =
[ :headers, :topic, :direct, :fanout ]
- @@queue_count =
1
Instance Attribute Summary collapse
Instance Method Summary
collapse
Instance Attribute Details
#not_acked ⇒ Object
Returns the value of attribute not_acked.
33
34
35
|
# File 'lib/emissary/operator/amqp.rb', line 33
def not_acked
@not_acked
end
|
#subscriptions ⇒ Object
Returns the value of attribute subscriptions.
32
33
34
|
# File 'lib/emissary/operator/amqp.rb', line 32
def subscriptions
@subscriptions
end
|
Instance Method Details
#acknowledge(message) ⇒ Object
163
164
165
166
167
168
169
170
171
172
173
174
|
# File 'lib/emissary/operator/amqp.rb', line 163
def acknowledge message
unless message.kind_of? Emissary::Message
Emissary.logger.warning "Can't acknowledge message not deriving from Emissary::Message class"
end
@not_acked.delete(message.uuid).ack
Emissary.logger.debug "Acknowledged Message ID: #{message.uuid}"
rescue NoMethodError
Emissary.logger.warning "Message with UUID #{message.uuid} not acknowledged."
rescue Exception => e
Emissary.logger.error "Error in Emissary::Operator::AMQP#acknowledge: #{e.class.name}: #{e.message}\n\t#{e.backtrace.join("\n\t")}"
end
|
188
189
190
191
192
193
194
195
196
197
198
199
|
# File 'lib/emissary/operator/amqp.rb', line 188
def close
sleep 1 unsubscribe
::AMQP.stop
end
|
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/emissary/operator/amqp.rb', line 84
def connect
if @connect_details[:ssl] and not EM.ssl?
raise ::Emissary::Error::ConnectionError ,
"Requested SSL connection but EventMachine not compiled with SSL support - quitting!"
end
@message_pool = Queue.new
@connection = ::AMQP.connect(@connect_details)
@channel = ::MQ.new(@connection)
@queue_config = {
:durable => @config[:queue_durable].nil? ? false : @config[:queue_durable],
:auto_delete => @config[:queue_auto_delete].nil? ? true : @config[:queue_auto_delete],
:exclusive => @config[:queue_exclusive].nil? ? true : @config[:queue_exclusive]
}
@queue = ::MQ::Queue.new(@channel, @queue_name, @queue_config)
@exchanges = {}
@exchanges[:topic] = ::MQ::Exchange.new(@channel, :topic, 'amq.topic')
@exchanges[:fanout] = ::MQ::Exchange.new(@channel, :fanout, 'amq.fanout')
@exchanges[:direct] = ::MQ::Exchange.new(@channel, :direct, 'amq.direct')
true
end
|
#post_init ⇒ Object
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/emissary/operator/amqp.rb', line 55
def post_init
uri = ::URI.parse @config[:uri]
ssl = (uri.scheme.to_sym == :amqps)
@connect_details = {
:host => uri.host,
:ssl => ssl,
:user => (::URI.decode(uri.user) rescue nil) || 'guest',
:pass => (::URI.decode(uri.password) rescue nil) || 'guest',
:vhost => (! uri.path.empty? ? uri.path : '/nimbul'),
:port => uri.port || (ssl ? 5671 : 5672),
:logging => !!@config[:debug],
}
@subscriptions = @config[:subscriptions].inject({}) do |hash,queue|
key, type = queue.split(':')
type = type.nil? ? DEFAULT_EXCHANGE : (VALID_EXCHANGES.include?(type.to_sym) ? type.to_sym : DEFAULT_EXCHANGE)
(hash[type] ||= []) << key
hash
end
@queue_name = "#{Emissary.identity.queue_name}.#{@@queue_count}"
@@queue_count += 1
@not_acked = {}
end
|
#reject(message, opts = { :requeue => true }) ⇒ Object
176
177
178
179
180
181
182
183
184
185
186
|
# File 'lib/emissary/operator/amqp.rb', line 176
def reject message, opts = { :requeue => true }
return true unless message.kind_of? Emissary::Message
Emissary.logger.warning "Unable to reject message not deriving from Emissary::Message class"
end
@not_acked.delete(message.uuid).reject(opts)
Emissary.logger.debug "Rejected Message ID: #{message.uuid}"
rescue Exception => e
Emissary.logger.error "Error in AMQP::Reject: #{e.class.name}: #{e.message}\n\t#{e.backtrace.join("\n\t")}"
end
|
#send_data(msg) ⇒ Object
153
154
155
156
157
158
159
160
161
|
# File 'lib/emissary/operator/amqp.rb', line 153
def send_data msg
begin
Emissary.logger.debug "Sending message through exchange '#{msg.exchange_type.to_s}' with routing key '#{msg.routing_key}'"
Emissary.logger.debug "Message Originator: #{msg.originator} - Recipient: #{msg.recipient}"
@exchanges[msg.exchange_type].publish msg.stamp_sent!.encode, :routing_key => msg.routing_key
rescue NoMethodError
raise InvalidExchange, "publish request on invalid exchange '#{msg.exchange_type}' with routing key '#{msg.routing_key}'"
end
end
|
201
202
|
# File 'lib/emissary/operator/amqp.rb', line 201
def status
end
|
#subscribe ⇒ Object
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
# File 'lib/emissary/operator/amqp.rb', line 111
def subscribe
@subscriptions.each do |exchange, keys|
keys.map do |routing_key|
Emissary.logger.debug "Subscribing To Key: '#{routing_key}' on Exchange '#{exchange}'"
@queue.bind(@exchanges[exchange], :key => routing_key)
end
end
@queue.bind(@exchanges[:direct], :key => Emissary.identity.queue_name)
@queue.subscribe(:ack => true) do |info, message|
begin
message = Emissary::Message.decode(message).stamp_received!
rescue ::Emissary::Error::InvalidMessageFormat => e
message = Emissary::Message.new
message.errors << e
end
@not_acked[message.uuid] = info
Emissary.logger.debug "Received through '#{info.exchange}' and routing key '#{info.routing_key}'"
receive message
end
end
|
#unsubscribe ⇒ Object
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
# File 'lib/emissary/operator/amqp.rb', line 138
def unsubscribe
@subscriptions.each do |exchange, keys|
keys.map do |routing_key|
Emissary.logger.info "Unsubscribing from '#{routing_key}' on Exchange '#{exchange}'"
@queue.unbind(@exchanges[exchange], :key => routing_key)
end
end
Emissary.logger.info "Unsubscribing from my own queue."
@queue.unbind(@exchanges[:direct], :key => Emissary.identity.queue_name)
Emissary.logger.info "Cancelling all subscriptions."
@queue.unsubscribe end
|
#validate_config! ⇒ Object
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
# File 'lib/emissary/operator/amqp.rb', line 37
def validate_config!
errors = []
errors << 'config not a hash!' unless config.instance_of? Hash
REQUIRED_KEYS.each do |key|
errors << "missing required option '#{key}'" unless config.has_key? key
end
u = ::URI.parse(config[:uri])
errors << "URI scheme /must/ be one of 'amqp' or 'amqps'" unless !!u.scheme.match(/^amqps{0,1}$/)
[ :user, :password, :host, :path ].each do |v|
errors << "invalid value 'nil' for URI part [#{v}]" if u.respond_to? v and u.send(v).nil?
end
raise errors.join("\n") unless errors.empty?
return true
end
|