Class: Promiscuous::Subscriber::Worker
- Inherits:
-
Object
- Object
- Promiscuous::Subscriber::Worker
show all
- Includes:
- Common::Worker
- Defined in:
- lib/promiscuous/subscriber/worker.rb
Instance Method Summary
collapse
#bareback?, #initialize, #unit_of_work
Instance Method Details
#queue_bindings ⇒ Object
33
34
35
36
37
38
39
40
41
42
43
44
|
# File 'lib/promiscuous/subscriber/worker.rb', line 33
def queue_bindings
queue_name = "#{Promiscuous::Config.app}.promiscuous"
exchange_name = Promiscuous::AMQP::EXCHANGE
if options[:personality]
queue_name += ".#{options[:personality]}"
exchange_name += ".#{options[:personality]}"
end
bindings = Promiscuous::Subscriber::AMQP.subscribers.keys
{:exchange_name => exchange_name, :queue_name => queue_name, :bindings => bindings}
end
|
#replicate ⇒ Object
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
# File 'lib/promiscuous/subscriber/worker.rb', line 4
def replicate
Promiscuous::AMQP.open_queue(queue_bindings) do |queue|
queue.subscribe :ack => true do |metadata, payload|
begin
unless self.stop
Promiscuous.info "[receive] #{payload}"
parsed_payload = JSON.parse(payload)
queue = parsed_payload['__amqp__']
self.unit_of_work(queue) { Promiscuous::Subscriber.process(parsed_payload) }
metadata.ack
end
rescue Exception => e
e = Promiscuous::Subscriber::Error.new(e, payload)
if bareback?
metadata.ack
else
self.stop = true
Promiscuous::AMQP.disconnect
end
Promiscuous.error "[receive] FATAL #{"skipping " if bareback?}#{e} #{e.backtrace.join("\n")}"
Promiscuous::Config.error_handler.try(:call, e)
end
end
end
end
|