Class: Factor::Runtime::MessageBus
- Inherits:
-
Object
- Object
- Factor::Runtime::MessageBus
- Defined in:
- lib/runtime/message_bus.rb
Instance Attribute Summary collapse
-
#channel ⇒ Object
Returns the value of attribute channel.
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#exchange ⇒ Object
Returns the value of attribute exchange.
-
#host ⇒ Object
Returns the value of attribute host.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#token ⇒ Object
Returns the value of attribute token.
-
#username ⇒ Object
Returns the value of attribute username.
-
#vhost ⇒ Object
Returns the value of attribute vhost.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(email, token) ⇒ MessageBus
constructor
A new instance of MessageBus.
-
#listen(routing_key = "#", &code) ⇒ Object
creates a new queue to listen to the topic exchange.
- #send(message) ⇒ Object
- #send_and_close(message) ⇒ Object
-
#start(topic = "workflow", &code) ⇒ Object
Creates the connection and creates a topic exchange An exchange references a place to send messages to the exchange routes it to the queues based on the route_key.
Constructor Details
#initialize(email, token) ⇒ MessageBus
Returns a new instance of MessageBus.
10 11 12 13 14 15 |
# File 'lib/runtime/message_bus.rb', line 10 def initialize(email,token) @host = "queue.factor.io" @vhost = email @username=email @token=token end |
Instance Attribute Details
#channel ⇒ Object
Returns the value of attribute channel.
8 9 10 |
# File 'lib/runtime/message_bus.rb', line 8 def channel @channel end |
#connection ⇒ Object
Returns the value of attribute connection.
8 9 10 |
# File 'lib/runtime/message_bus.rb', line 8 def connection @connection end |
#exchange ⇒ Object
Returns the value of attribute exchange.
8 9 10 |
# File 'lib/runtime/message_bus.rb', line 8 def exchange @exchange end |
#host ⇒ Object
Returns the value of attribute host.
8 9 10 |
# File 'lib/runtime/message_bus.rb', line 8 def host @host end |
#queue ⇒ Object
Returns the value of attribute queue.
8 9 10 |
# File 'lib/runtime/message_bus.rb', line 8 def queue @queue end |
#token ⇒ Object
Returns the value of attribute token.
8 9 10 |
# File 'lib/runtime/message_bus.rb', line 8 def token @token end |
#username ⇒ Object
Returns the value of attribute username.
8 9 10 |
# File 'lib/runtime/message_bus.rb', line 8 def username @username end |
#vhost ⇒ Object
Returns the value of attribute vhost.
8 9 10 |
# File 'lib/runtime/message_bus.rb', line 8 def vhost @vhost end |
Instance Method Details
#close ⇒ Object
56 57 58 59 |
# File 'lib/runtime/message_bus.rb', line 56 def close @connection.close{ EventMachine.stop } end |
#listen(routing_key = "#", &code) ⇒ Object
creates a new queue to listen to the topic exchange
33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/runtime/message_bus.rb', line 33 def listen(routing_key="#",&code) queue_name=SecureRandom.hex @queue = @channel.queue(queue_name) @queue.bind(@exchange, :routing_key=>routing_key) # bind queue to the Exchange @queue.subscribe do |headers,payload| = Message.new .from_queue headers.routing_key, payload code.call() end end |
#send(message) ⇒ Object
45 46 47 |
# File 'lib/runtime/message_bus.rb', line 45 def send() @exchange.publish(.payload,:routing_key => .route) end |
#send_and_close(message) ⇒ Object
49 50 51 52 53 54 |
# File 'lib/runtime/message_bus.rb', line 49 def send_and_close() send() EM.add_timer(1, Proc.new { close}) end |
#start(topic = "workflow", &code) ⇒ Object
Creates the connection and creates a topic exchange An exchange references a place to send messages to the exchange routes it to the queues based on the route_key
21 22 23 24 25 26 27 28 29 30 |
# File 'lib/runtime/message_bus.rb', line 21 def start(topic="workflow",&code) EventMachine.run do #connection_settings={:host=>@host,:user=>@username,:password=>@token,:vhost=>@vhost} connection_settings={:host=>@host} @connection = AMQP.connect(connection_settings) @channel = AMQP::Channel.new(connection) @exchange = @channel.topic(topic,:auto_delete=>true) # new topic exchange code.call end end |