Class: Untied::Publisher::AMQP::Producer
- Inherits:
-
BaseProducer
- Object
- BaseProducer
- Untied::Publisher::AMQP::Producer
- Defined in:
- lib/untied-publisher/amqp/producer.rb
Instance Attribute Summary
Attributes inherited from BaseProducer
#deliver_messages, #routing_key, #service_name
Instance Method Summary collapse
- #check_em_reactor ⇒ Object
-
#initialize(opts = {}) ⇒ Producer
constructor
Encapsulates both the Channel and Exchange (AMQP).
-
#on_exchange(&block) ⇒ Object
Creates a new exchange and yields it to the block passed when it’s ready.
-
#safe_publish(e) ⇒ Object
Publish the given event.
Methods inherited from BaseProducer
Constructor Details
#initialize(opts = {}) ⇒ Producer
Encapsulates both the Channel and Exchange (AMQP).
10 11 12 13 14 15 16 17 |
# File 'lib/untied-publisher/amqp/producer.rb', line 10 def initialize(opts={}) super check_em_reactor if ::AMQP.channel || opts[:channel] say "Using defined AMQP.channel" @channel = ::AMQP.channel || opts[:channel] end end |
Instance Method Details
#check_em_reactor ⇒ Object
34 35 36 37 38 39 |
# File 'lib/untied-publisher/amqp/producer.rb', line 34 def check_em_reactor if !defined?(EventMachine) || !EM.reactor_running? raise "In order to use the producer you must be running inside an " + \ "eventmachine loop" end end |
#on_exchange(&block) ⇒ Object
Creates a new exchange and yields it to the block passed when it’s ready
29 30 31 32 |
# File 'lib/untied-publisher/amqp/producer.rb', line 29 def on_exchange(&block) return unless @channel @channel.topic('untied', :auto_delete => true, &block) end |
#safe_publish(e) ⇒ Object
Publish the given event.
event: object which is going to be serialized and sent through the
wire. It should respond to #to_json.
22 23 24 25 26 |
# File 'lib/untied-publisher/amqp/producer.rb', line 22 def safe_publish(e) on_exchange do |exchange| exchange.publish(e.to_json, :routing_key => @routing_key) end end |