Class: Untied::Publisher::AMQP::Producer

Inherits:
BaseProducer show all
Defined in:
lib/untied-publisher/amqp/producer.rb

Instance Attribute Summary

Attributes inherited from BaseProducer

#deliver_messages, #routing_key, #service_name

Instance Method Summary collapse

Methods inherited from BaseProducer

#publish

Constructor Details

#initialize(opts = {}) ⇒ Producer

Encapsulates both the Channel and Exchange (AMQP).



10
11
12
13
14
15
16
17
# File 'lib/untied-publisher/amqp/producer.rb', line 10

def initialize(opts={})
  super
  check_em_reactor
  if ::AMQP.channel || opts[:channel]
    say "Using defined AMQP.channel"
    @channel = ::AMQP.channel || opts[:channel]
  end
end

Instance Method Details

#check_em_reactorObject



34
35
36
37
38
39
# File 'lib/untied-publisher/amqp/producer.rb', line 34

def check_em_reactor
  if !defined?(EventMachine) || !EM.reactor_running?
    raise "In order to use the producer you must be running inside an " + \
      "eventmachine loop"
  end
end

#on_exchange(&block) ⇒ Object

Creates a new exchange and yields it to the block passed when it’s ready



29
30
31
32
# File 'lib/untied-publisher/amqp/producer.rb', line 29

def on_exchange(&block)
  return unless @channel
  @channel.topic('untied', :auto_delete => true, &block)
end

#safe_publish(e) ⇒ Object

Publish the given event.

event: object which is going to be serialized and sent through the
wire. It should respond to #to_json.


22
23
24
25
26
# File 'lib/untied-publisher/amqp/producer.rb', line 22

def safe_publish(e)
  on_exchange do |exchange|
    exchange.publish(e.to_json, :routing_key => @routing_key)
  end
end