Class: Workling::Clients::AmqpClient
- Inherits:
-
BrokerBase
- Object
- Base
- BrokerBase
- Workling::Clients::AmqpClient
- Defined in:
- lib/workling/clients/amqp_client.rb
Class Method Summary collapse
Instance Method Summary collapse
-
#close ⇒ Object
no need for explicit closing.
-
#connect ⇒ Object
starts the client.
- #request(key, value) ⇒ Object
-
#retrieve(key) ⇒ Object
request and retrieve work.
-
#subscribe(key) ⇒ Object
subscribe to a queue.
Methods inherited from BrokerBase
Methods inherited from Base
#dispatch, installed?, #logger
Class Method Details
.load ⇒ Object
8 9 10 11 12 13 14 15 16 17 |
# File 'lib/workling/clients/amqp_client.rb', line 8 def self.load begin require 'mq' rescue Exception => e raise WorklingError.new( "WORKLING: couldn't find the ruby amqp client - you need it for the amqp runner. " \ "Install from github: gem sources -a http://gems.github.com/ && sudo gem install tmm1-amqp " ) end end |
Instance Method Details
#close ⇒ Object
no need for explicit closing. when the event loop terminates, the connection is closed anyway.
31 |
# File 'lib/workling/clients/amqp_client.rb', line 31 def close; true; end |
#connect ⇒ Object
starts the client.
20 21 22 23 24 25 26 27 |
# File 'lib/workling/clients/amqp_client.rb', line 20 def connect begin connection = AMQP.start((Workling.config[:amqp_options] ||{}).symbolize_keys) @amq = MQ.new connection rescue raise WorklingError.new("couldn't start amq client. if you're running this in a server environment, then make sure the server is evented (ie use thin or evented mongrel, not normal mongrel.)") end end |
#request(key, value) ⇒ Object
43 |
# File 'lib/workling/clients/amqp_client.rb', line 43 def request(key, value); @amq.queue(queue_for(key)).publish(Marshal.dump(value)); end |
#retrieve(key) ⇒ Object
request and retrieve work
42 |
# File 'lib/workling/clients/amqp_client.rb', line 42 def retrieve(key); @amq.queue(queue_for(key)); end |
#subscribe(key) ⇒ Object
subscribe to a queue
34 35 36 37 38 39 |
# File 'lib/workling/clients/amqp_client.rb', line 34 def subscribe(key) @amq.queue(queue_for(key)).subscribe do |value| data = Marshal.load(value) rescue value yield data end end |