Class: MassTransit::Amqp
- Inherits:
-
Object
- Object
- MassTransit::Amqp
- Defined in:
- lib/masstransit/transports/amqp.rb
Overview
The wrapper class on top of AMQP, that provides the standard masstransit ‘transport’ api
Instance Method Summary collapse
-
#bind(exchange) ⇒ Object
binds the queue to the exchange.
-
#close ⇒ Object
closes the connection to the Amqp server.
-
#create_message(data, serializer) ⇒ Object
creates a transport ready message object.
- #get_message(rmsg) ⇒ Object
- #monitor(&block) ⇒ Object
-
#open(config) ⇒ Object
opens a connection to the Amqp server.
- #publish(exchange, data) ⇒ Object
-
#queue_declare(name) ⇒ Object
declares a queue on the server.
- #queue_delete(name) ⇒ Object
-
#send(queue, data) ⇒ Object
pushes the message onto the exchange.
-
#unbind(queue, exchange) ⇒ Object
unbnids the queue from the exchange.
Instance Method Details
#bind(exchange) ⇒ Object
binds the queue to the exchange
38 39 40 41 42 |
# File 'lib/masstransit/transports/amqp.rb', line 38 def bind(exchange) ex = @client.exchange(exchange, :type=>:fanout, :durable=>true) q = @client.queue(@queue) q.bind(ex) end |
#close ⇒ Object
closes the connection to the Amqp server
24 25 26 |
# File 'lib/masstransit/transports/amqp.rb', line 24 def close() @client.close() end |
#create_message(data, serializer) ⇒ Object
creates a transport ready message object
49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/masstransit/transports/amqp.rb', line 49 def (data, serializer) msg_name = data.class.name.gsub("::",".") msg = data env = Envelope.new env.MessageType = msg_name #this needs to be a string for .net env.Message = serializer.serialize(msg) return env end |
#get_message(rmsg) ⇒ Object
61 62 63 |
# File 'lib/masstransit/transports/amqp.rb', line 61 def (rmsg) return rmsg[:payload] end |
#monitor(&block) ⇒ Object
65 66 67 68 69 70 71 72 |
# File 'lib/masstransit/transports/amqp.rb', line 65 def monitor(&block) #basic consume / pop loop here q = @client.queue(@queue) q.subscribe(:consumer_tag => 'testtag1', :timeout => 30) do |msg| block.call msg end end |
#open(config) ⇒ Object
opens a connection to the Amqp server
9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/masstransit/transports/amqp.rb', line 9 def open(config) @client = Bunny.new( :logging=>true, :host => config.server, :port => config.port, :user => config.user, :pass => config.password, :vhost => config.vdir, :insist => config.insist ) @queue = config.queue @client.start end |
#publish(exchange, data) ⇒ Object
79 80 81 82 |
# File 'lib/masstransit/transports/amqp.rb', line 79 def publish(exchange, data) ex = @client.exchange(exchange, :type=>:fanout, :durable=>true) ex.publish(data) end |
#queue_declare(name) ⇒ Object
declares a queue on the server
29 30 31 |
# File 'lib/masstransit/transports/amqp.rb', line 29 def queue_declare(name) @client.queue(name) end |
#queue_delete(name) ⇒ Object
33 34 35 |
# File 'lib/masstransit/transports/amqp.rb', line 33 def queue_delete(name) #how to? end |
#send(queue, data) ⇒ Object
pushes the message onto the exchange
75 76 77 |
# File 'lib/masstransit/transports/amqp.rb', line 75 def send(queue, data) @client.queue(queue).publish(data) end |
#unbind(queue, exchange) ⇒ Object
unbnids the queue from the exchange
45 46 |
# File 'lib/masstransit/transports/amqp.rb', line 45 def unbind(queue, exchange) end |