Class: MassTransit::Bus
- Inherits:
-
Object
- Object
- MassTransit::Bus
- Defined in:
- lib/masstransit/bus.rb
Overview
The bus abstracts the desired transportation, and manages the callbacks orchestrates the threading
three main queues (data, control, poison)
Instance Method Summary collapse
- #close ⇒ Object
-
#consume(rmsg) ⇒ Object
takes a rabbitmq message, strips off the noise and gets back to an envelope.
-
#deliver(env) ⇒ Object
for local distribution.
-
#initialize(conf) ⇒ Bus
constructor
A new instance of Bus.
-
#publish(message) ⇒ Object
this will publish the message object to an exchange in rabbitmq that is equal to the message class name.
-
#start ⇒ Object
tells the bus to start listening for messages this method blocks forver.
-
#subscribe(message_name, &block) ⇒ Object
this will register an exchange in rabbit for the ‘message_name’ and then bind the queue to that exchange.
-
#unsubscribe(message_name) ⇒ Object
this will unregister the queue with the exchange in rabbitmq for the message_name.
Constructor Details
#initialize(conf) ⇒ Bus
Returns a new instance of Bus.
7 8 9 10 11 12 13 14 |
# File 'lib/masstransit/bus.rb', line 7 def initialize(conf) @subscriptions = {} @serializer = conf.serializer @transport = conf.transport @queue = conf.queue @transport.open(conf) end |
Instance Method Details
#close ⇒ Object
49 50 51 |
# File 'lib/masstransit/bus.rb', line 49 def close() end |
#consume(rmsg) ⇒ Object
takes a rabbitmq message, strips off the noise and gets back to an envelope
65 66 67 68 69 70 71 72 |
# File 'lib/masstransit/bus.rb', line 65 def consume(rmsg) data = @transport.(rmsg) #payload is a string env = @serializer.deserialize data puts 'oeuaoeusatoehuntsaoheuntsahoeusnth' puts env deliver(env) end |
#deliver(env) ⇒ Object
for local distribution
75 76 77 78 79 80 81 82 |
# File 'lib/masstransit/bus.rb', line 75 def deliver(env) consumers = @subscriptions[env.MessageType] consumers = [] if consumers.nil? consumers.each do |c| obj = @serializer.deserialize(env.Message) c.call obj end end |
#publish(message) ⇒ Object
this will publish the message object to an exchange in rabbitmq that is equal to the message class name. this is a direct concept from .net and should be adopted into a more ruby manner
56 57 58 59 60 61 |
# File 'lib/masstransit/bus.rb', line 56 def publish() envelope = @transport.(, @serializer) data = @serializer.serialize(envelope) @transport.publish(envelope.MessageType, data) #exchange? end |
#start ⇒ Object
tells the bus to start listening for messages this method blocks forver. Need to implement better ctrl-c support
42 43 44 45 46 47 |
# File 'lib/masstransit/bus.rb', line 42 def start() #start listening @transport.monitor do |rmsg| consume(rmsg) end end |
#subscribe(message_name, &block) ⇒ Object
this will register an exchange in rabbit for the ‘message_name’ and then bind the queue to that exchange. it then sets the subscriptions to the callback provided
19 20 21 22 23 24 25 26 |
# File 'lib/masstransit/bus.rb', line 19 def subscribe(, &block) @subscriptions = {} if @subscriptions.nil? consumers = @subscriptions[] consumers = [] if consumers.nil? consumers << block @subscriptions[] = consumers @transport.bind end |
#unsubscribe(message_name) ⇒ Object
this will unregister the queue with the exchange in rabbitmq for the message_name. It then removes the callbacks in the subscriptions
31 32 33 34 35 36 37 |
# File 'lib/masstransit/bus.rb', line 31 def unsubscribe() @transport.unbind(, @queue) #has key check @subscriptions.delete() end |