Class: Warren::Queue::AMQPAdapter
- Inherits:
-
Warren::Queue
- Object
- Warren::Queue
- Warren::Queue::AMQPAdapter
- Defined in:
- lib/warren/adapters/amqp_adapter.rb
Constant Summary
Constants inherited from Warren::Queue
InvalidAdapter, NoAdapterSet, NoBlockGiven, NoConnectionDetails
Class Method Summary collapse
-
.check_connection_details(opts) ⇒ Object
Checks the connection details are correct for this adapter.
-
.publish(queue_name, payload, &blk) ⇒ Object
Sends a message to a queue.
-
.queue_name ⇒ Object
Returns the default queue name or returns InvalidConnectionDetails if no default queue is defined.
-
.subscribe(queue_name, &block) ⇒ Object
Subscribes to a queue and runs the block for each message received.
Methods inherited from Warren::Queue
adapter, adapter=, connection, connection=, inherited, logger, logger=
Class Method Details
.check_connection_details(opts) ⇒ Object
Checks the connection details are correct for this adapter
9 10 11 12 13 14 15 |
# File 'lib/warren/adapters/amqp_adapter.rb', line 9 def self.check_connection_details opts # Check they've passed in the stuff without a default on it unless opts.has_key?(:user) && opts.has_key?(:pass) && opts.has_key?(:vhost) raise Warren::Connection::InvalidConnectionDetails, "Missing a username, password or vhost." end true end |
.publish(queue_name, payload, &blk) ⇒ Object
Sends a message to a queue. If successfully sent it returns true, unless callback block is passed (see below)
Warren::Queue.publish(:queue_name, {:foo => "name"})
Can also pass a block which is fired after the message is sent. If a block is passed, then the return value of the block is returned from this method.
Warren::Queue.publish(:queue_name, {:foo => "name"}) { puts "foo" }
40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/warren/adapters/amqp_adapter.rb', line 40 def self.publish queue_name, payload, &blk queue_name = self.queue_name if queue_name == :default # Create a message object if it isn't one already msg = Warren::MessageFilter.pack(payload) do_connect(true, blk) do queue = MQ::Queue.new(MQ.new, queue_name) queue.publish msg.to_s end end |
.queue_name ⇒ Object
Returns the default queue name or returns InvalidConnectionDetails if no default queue is defined
21 22 23 24 25 26 |
# File 'lib/warren/adapters/amqp_adapter.rb', line 21 def self.queue_name unless self.connection..has_key?(:default_queue) raise Warren::Connection::InvalidConnectionDetails, "Missing a default queue name." end self.connection.[:default_queue] end |
.subscribe(queue_name, &block) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/warren/adapters/amqp_adapter.rb', line 60 def self.subscribe queue_name, &block raise NoBlockGiven unless block_given? queue_name = self.queue_name if queue_name == :default # todo: check if its a valid queue? do_connect(false) do queue = MQ::Queue.new(MQ.new, queue_name) queue.subscribe do |msg| msg = Warren::MessageFilter.unpack(msg) block.call(msg) end end end |