Class: Qup::Adapter::Kestrel::Queue
- Inherits:
-
Destination
- Object
- Destination
- Qup::Adapter::Kestrel::Queue
- Includes:
- QueueAPI
- Defined in:
- lib/qup/adapter/kestrel/queue.rb
Overview
Internal: The Implementation of Queue in the Kestrel Adapter
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Internal: The name of the Queue.
Instance Method Summary collapse
-
#acknowledge(message) ⇒ Object
Internal: Acknowledge that message is completed and remove it from the Queue.
-
#consume(&block) ⇒ Object
Internal: Retrieve a Message from the Queue.
-
#depth ⇒ Object
Internal: return the number of Messages on the Queue.
-
#flush ⇒ Object
Internal: Remove all messages from the Queue.
-
#initialize(address, name) ⇒ Queue
constructor
Internal: Create a new Queue.
-
#produce(message) ⇒ Object
Internal: Put an item onto the Queue.
Methods included from QueueAPI
#consumer, #destroy, #producer
Methods inherited from Destination
Constructor Details
#initialize(address, name) ⇒ Queue
Internal: Create a new Queue
address - the Connection Address string for the Kestrel Client name - the String name of the Topic
Returns a new Queue
16 17 18 19 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 16 def initialize( address, name ) super(address, name) @open_messages = {} end |
Instance Attribute Details
#name ⇒ Object (readonly)
Internal: The name of the Queue
22 23 24 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 22 def name @name end |
Instance Method Details
#acknowledge(message) ⇒ Object
Internal: Acknowledge that message is completed and remove it from the Queue.
For Kestrel, this really just closes the last message, the message that is sent in does not matter.
Returns nothing
85 86 87 88 89 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 85 def acknowledge( ) open_msg = @open_messages.delete( .key ) raise Qup::Error, "Message #{.key} is not currently being consumed" unless open_msg @client.close_last_transaction end |
#consume(&block) ⇒ Object
Internal: Retrieve a Message from the Queue
Yields a Message
A user of the Qup API should use a Consumer instance to retrieve items from the Queue.
Returns a Message
66 67 68 69 70 71 72 73 74 75 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 66 def consume(&block) data = @client.get( @name ) = ::Qup::Message.new( data.object_id, data ) @open_messages[.key] = if block_given? then ( , &block ) else return end end |
#depth ⇒ Object
Internal: return the number of Messages on the Queue
Returns an integer of the Queue depth
35 36 37 38 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 35 def depth stats = @admin_client.stat( @name ) return stats['items'] end |
#flush ⇒ Object
Internal: Remove all messages from the Queue
Returns nothing.
27 28 29 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 27 def flush @admin_client.flush(@name) end |
#produce(message) ⇒ Object
Internal: Put an item onto the Queue
message - the data to put onto the queue.
The ‘message’ that is passed in is wrapped in a Qup::Message before being stored.
A user of the Qup API should use a Producer instance to put items onto the queue.
Returns the Message that was put onto the Queue
52 53 54 55 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 52 def produce( ) @client.set( @name, ) return ::Qup::Message.new( .object_id, ) end |