Class: MobME::Infrastructure::Queue::Backends::AMQP
- Inherits:
-
MobME::Infrastructure::Queue::Backend
- Object
- MobME::Infrastructure::Queue::Backend
- MobME::Infrastructure::Queue::Backends::AMQP
- Defined in:
- lib/mobme/infrastructure/queue/backends/amqp.rb
Instance Method Summary collapse
- #add(queue, item, metadata = {}) ⇒ Object
-
#add_bulk(queue, items = []) ⇒ Object
Adds many items together.
- #empty(queue) ⇒ Object
-
#initialize(options = {}) ⇒ AMQP
constructor
A new instance of AMQP.
- #list(queue) ⇒ Object
- #list_queues ⇒ Object
- #peek(queue) ⇒ Object
- #remove(queue) ⇒ Object
- #remove_queues(*queues) ⇒ Object (also: #remove_queue)
- #size(queue) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ AMQP
Returns a new instance of AMQP.
5 6 7 8 9 10 |
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 5 def initialize( = {}) @bunny_options = [:bunny_options] || {} @amqp_queues = {} configure end |
Instance Method Details
#add(queue, item, metadata = {}) ⇒ Object
12 13 14 15 16 17 18 19 |
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 12 def add(queue, item, = {}) = () #register the queue if needed queue_for(queue) @amqp_exchange.publish(serialize_item(item, ), :key => queue) end |
#add_bulk(queue, items = []) ⇒ Object
Adds many items together
22 23 24 25 26 |
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 22 def add_bulk(queue, items = []) items.each do |item| add(queue, item, {}) end end |
#empty(queue) ⇒ Object
45 46 47 |
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 45 def empty(queue) queue_for(queue).purge end |
#list(queue) ⇒ Object
41 42 43 |
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 41 def list(queue) raise NotImplementedError, "AMQP doesn't support list!" end |
#list_queues ⇒ Object
49 50 51 |
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 49 def list_queues @amqp_queues.keys end |
#peek(queue) ⇒ Object
33 34 35 |
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 33 def peek(queue) raise NotImplementedError, "AMQP doesn't support peek!" end |
#remove(queue) ⇒ Object
28 29 30 31 |
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 28 def remove(queue) item = queue_for(queue).pop[:payload] (:queue_empty == item) ? nil : unserialize_item(item) end |
#remove_queues(*queues) ⇒ Object Also known as: remove_queue
53 54 55 56 57 58 59 |
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 53 def remove_queues(*queues) queues = list_queues if queues.empty? queues.each do |queue| queue_for(queue).delete @amqp_queues.delete(queue) end end |
#size(queue) ⇒ Object
37 38 39 |
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 37 def size(queue) queue_for(queue). end |