Class: Promiscuous::Subscriber::Worker

Inherits:
Object
  • Object
show all
Includes:
Common::Worker
Defined in:
lib/promiscuous/subscriber/worker.rb

Instance Method Summary collapse

Methods included from Common::Worker

#bareback?, #initialize, #unit_of_work

Instance Method Details

#queue_bindingsObject



33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/promiscuous/subscriber/worker.rb', line 33

def queue_bindings
  queue_name = "#{Promiscuous::Config.app}.promiscuous"
  exchange_name = Promiscuous::AMQP::EXCHANGE

  if options[:personality]
    queue_name    += ".#{options[:personality]}"
    exchange_name += ".#{options[:personality]}"
  end

  bindings = Promiscuous::Subscriber::AMQP.subscribers.keys
  {:exchange_name => exchange_name, :queue_name => queue_name, :bindings => bindings}
end

#replicateObject



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/promiscuous/subscriber/worker.rb', line 4

def replicate
  Promiscuous::AMQP.open_queue(queue_bindings) do |queue|
    queue.subscribe :ack => true do |, payload|
      # Note: This code always runs on the root Fiber,
      # so ordering is always preserved
      begin
        unless self.stop
          Promiscuous.info "[receive] #{payload}"
          parsed_payload = JSON.parse(payload)
          queue = parsed_payload['__amqp__']
          self.unit_of_work(queue) { Promiscuous::Subscriber.process(parsed_payload) }
          .ack
        end
      rescue Exception => e
        e = Promiscuous::Subscriber::Error.new(e, payload)

        if bareback?
          .ack
        else
          self.stop = true
          Promiscuous::AMQP.disconnect
        end
        Promiscuous.error "[receive] FATAL #{"skipping " if bareback?}#{e} #{e.backtrace.join("\n")}"
        Promiscuous::Config.error_handler.try(:call, e)
      end
    end
  end
end