Class: Factor::Runtime::MessageBus
- Inherits:
-
Object
- Object
- Factor::Runtime::MessageBus
- Defined in:
- lib/runtime/message_bus.rb
Instance Attribute Summary collapse
-
#channel ⇒ Object
Returns the value of attribute channel.
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#exchange ⇒ Object
Returns the value of attribute exchange.
-
#host ⇒ Object
Returns the value of attribute host.
-
#queue ⇒ Object
Returns the value of attribute queue.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(host = "queue.factor.io") ⇒ MessageBus
constructor
A new instance of MessageBus.
-
#listen(routing_key = "#", &code) ⇒ Object
creates a new queue to listen to the topic exchange.
- #send(message) ⇒ Object
- #send_and_close(message) ⇒ Object
-
#start(topic = "workflow", &code) ⇒ Object
Creates the connection and creates a topic exchange An exchange references a place to send messages to the exchange routes it to the queues based on the route_key.
Constructor Details
#initialize(host = "queue.factor.io") ⇒ MessageBus
Returns a new instance of MessageBus.
9 10 11 |
# File 'lib/runtime/message_bus.rb', line 9 def initialize(host="queue.factor.io") @host = host end |
Instance Attribute Details
#channel ⇒ Object
Returns the value of attribute channel.
7 8 9 |
# File 'lib/runtime/message_bus.rb', line 7 def channel @channel end |
#connection ⇒ Object
Returns the value of attribute connection.
7 8 9 |
# File 'lib/runtime/message_bus.rb', line 7 def connection @connection end |
#exchange ⇒ Object
Returns the value of attribute exchange.
7 8 9 |
# File 'lib/runtime/message_bus.rb', line 7 def exchange @exchange end |
#host ⇒ Object
Returns the value of attribute host.
7 8 9 |
# File 'lib/runtime/message_bus.rb', line 7 def host @host end |
#queue ⇒ Object
Returns the value of attribute queue.
7 8 9 |
# File 'lib/runtime/message_bus.rb', line 7 def queue @queue end |
Instance Method Details
#close ⇒ Object
49 50 51 52 |
# File 'lib/runtime/message_bus.rb', line 49 def close @connection.close{ EventMachine.stop } end |
#listen(routing_key = "#", &code) ⇒ Object
creates a new queue to listen to the topic exchange
26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/runtime/message_bus.rb', line 26 def listen(routing_key="#",&code) queue_name=SecureRandom.hex @queue = @channel.queue(queue_name) @queue.bind(@exchange, :routing_key=>routing_key) # bind queue to the Exchange @queue.subscribe do |headers,payload| = Message.new .from_queue headers.routing_key, payload code.call() end end |
#send(message) ⇒ Object
38 39 40 |
# File 'lib/runtime/message_bus.rb', line 38 def send() @exchange.publish(.payload,:routing_key => .route) end |
#send_and_close(message) ⇒ Object
42 43 44 45 46 47 |
# File 'lib/runtime/message_bus.rb', line 42 def send_and_close() send() EM.add_timer(1, Proc.new { close}) end |
#start(topic = "workflow", &code) ⇒ Object
Creates the connection and creates a topic exchange An exchange references a place to send messages to the exchange routes it to the queues based on the route_key
16 17 18 19 20 21 22 23 |
# File 'lib/runtime/message_bus.rb', line 16 def start(topic="workflow",&code) EventMachine.run do @connection = AMQP.connect(:host=>@host) @channel = AMQP::Channel.new(connection) @exchange = @channel.topic(topic,:auto_delete=>true) # new topic exchange code.call end end |