Class: Promiscuous::Subscriber::Worker::Pump

Inherits:
Object
  • Object
show all
Defined in:
lib/promiscuous/subscriber/worker/pump.rb

Instance Method Summary collapse

Constructor Details

#initialize(root) ⇒ Pump

Returns a new instance of Pump.



2
3
4
5
6
7
# File 'lib/promiscuous/subscriber/worker/pump.rb', line 2

def initialize(root)
  @root = root
  # late include of CelluloidSubscriber because the class is resolved
  # at runtime since we can have different backends.
  extend Promiscuous::AMQP::Subscriber
end

Instance Method Details

#connectObject



9
10
11
12
13
14
15
16
17
18
# File 'lib/promiscuous/subscriber/worker/pump.rb', line 9

def connect
  options = {}
  options[:bindings] = {}
  # We need to subscribe to everything to keep up with the version tracking
  Promiscuous::Config.subscriber_exchanges.each do |exchange|
    options[:bindings][exchange] = ['*']
  end

  subscribe(options, &method(:on_message))
end

#on_message(metadata, payload) ⇒ Object



20
21
22
23
24
25
26
# File 'lib/promiscuous/subscriber/worker/pump.rb', line 20

def on_message(, payload)
  msg = Promiscuous::Subscriber::Message.new(payload, :metadata => , :root_worker => @root)
  @root.runner.messages_to_process << msg
rescue Exception => e
  Promiscuous.warn "[receive] cannot process message: #{e}\n#{e.backtrace.join("\n")}"
  Promiscuous::Config.error_notifier.call(e)
end