Class: Qup::Adapter::Redis::Queue
- Inherits:
-
Connection
- Object
- Connection
- Qup::Adapter::Redis::Queue
- Includes:
- QueueAPI
- Defined in:
- lib/qup/adapter/redis/queue.rb
Overview
Internal: The Qup implementation for a Redis Queue
Instance Attribute Summary
Attributes inherited from Connection
Instance Method Summary collapse
-
#acknowledge(message) ⇒ Object
Internal: Acknowledge that message is completed and remove it from the Queue.
-
#consume(&block) ⇒ Object
Internal: Retrieve a Message from the Queue.
-
#depth ⇒ Object
Internal: return the number of Messages on the Queue.
-
#destroy ⇒ Object
(also: #flush)
Internal: Destroy the queue.
-
#initialize(uri, name) ⇒ Queue
constructor
Internal: create a new Queue.
-
#produce(message) ⇒ Object
Internal: Put an item onto the Queue.
Methods included from QueueAPI
Constructor Details
#initialize(uri, name) ⇒ Queue
Internal: create a new Queue
uri - the connection uri for the Redis Client name - the String name of the Queue
Returns a new Queue.
16 17 18 19 |
# File 'lib/qup/adapter/redis/queue.rb', line 16 def initialize( uri, name ) super @open_messages = {} end |
Instance Method Details
#acknowledge(message) ⇒ Object
Internal: Acknowledge that message is completed and remove it from the Queue.
In redis, this doesn’t do anything at all. The tracking is only performed to meet the API requirements.
Returns nothing.
52 53 54 55 |
# File 'lib/qup/adapter/redis/queue.rb', line 52 def acknowledge( ) open_msg = @open_messages.delete( .key ) raise Qup::Error, "Message #{.key} is not currently being consumed" unless open_msg end |
#consume(&block) ⇒ Object
Internal: Retrieve a Message from the Queue
Yields a Message
Returns a Message
75 76 77 78 79 80 81 82 83 84 |
# File 'lib/qup/adapter/redis/queue.rb', line 75 def consume(&block) queue_name, data = @client.brpop @name, 0 # blocking pop = ::Qup::Message.new( data.object_id, data ) @open_messages[.key] = if block_given? then ( , &block ) else return end end |
#depth ⇒ Object
Internal: return the number of Messages on the Queue
Returns an integer of the Queue depth
41 42 43 |
# File 'lib/qup/adapter/redis/queue.rb', line 41 def depth @client.llen name end |
#destroy ⇒ Object Also known as: flush
Internal: Destroy the queue
Removes the list from redis.
Returns nothing.
26 27 28 |
# File 'lib/qup/adapter/redis/queue.rb', line 26 def destroy @client.del name end |
#produce(message) ⇒ Object
Internal: Put an item onto the Queue
message - the data to put onto the queue.
The ‘message’ that is passed in is wrapped in a Qup::Message before being stored.
Returns the Message that was put onto the Queue
65 66 67 68 |
# File 'lib/qup/adapter/redis/queue.rb', line 65 def produce( ) @client.lpush name, return ::Qup::Message.new( .object_id, ) end |