Class: Qup::Adapter::Kestrel::Queue

Inherits:
Destination show all
Includes:
QueueAPI
Defined in:
lib/qup/adapter/kestrel/queue.rb

Overview

Internal: The Implementation of Queue in the Kestrel Adapter

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from QueueAPI

#consumer, #destroy, #producer

Methods inherited from Destination

#destroy, #ping

Constructor Details

#initialize(address, name) ⇒ Queue

Internal: Create a new Queue

address - the Connection Address string for the Kestrel Client name - the String name of the Topic

Returns a new Queue



16
17
18
19
# File 'lib/qup/adapter/kestrel/queue.rb', line 16

def initialize( address, name )
  super(address, name)
  @open_messages = {}
end

Instance Attribute Details

#nameObject (readonly)

Internal: The name of the Queue



22
23
24
# File 'lib/qup/adapter/kestrel/queue.rb', line 22

def name
  @name
end

Instance Method Details

#acknowledge(message) ⇒ Object

Internal: Acknowledge that message is completed and remove it from the Queue.

For Kestrel, this really just closes the last message, the message that is sent in does not matter.

Returns nothing

Raises:



85
86
87
88
89
# File 'lib/qup/adapter/kestrel/queue.rb', line 85

def acknowledge( message )
  open_msg = @open_messages.delete( message.key )
  raise Qup::Error, "Message #{message.key} is not currently being consumed" unless open_msg
  @client.close_last_transaction
end

#consume(&block) ⇒ Object

Internal: Retrieve a Message from the Queue

Yields a Message

A user of the Qup API should use a Consumer instance to retrieve items from the Queue.

Returns a Message



66
67
68
69
70
71
72
73
74
75
# File 'lib/qup/adapter/kestrel/queue.rb', line 66

def consume(&block)
  data = @client.get( @name )
  q_message = ::Qup::Message.new( data.object_id, data )
  @open_messages[q_message.key] = q_message
  if block_given? then
    yield_message( q_message, &block )
  else
    return q_message
  end
end

#depthObject

Internal: return the number of Messages on the Queue

Returns an integer of the Queue depth



35
36
37
38
# File 'lib/qup/adapter/kestrel/queue.rb', line 35

def depth
  stats = @admin_client.stat( @name )
  return stats['items']
end

#flushObject

Internal: Remove all messages from the Queue

Returns nothing.



27
28
29
# File 'lib/qup/adapter/kestrel/queue.rb', line 27

def flush
  @admin_client.flush(@name)
end

#produce(message) ⇒ Object

Internal: Put an item onto the Queue

message - the data to put onto the queue.

The ‘message’ that is passed in is wrapped in a Qup::Message before being stored.

A user of the Qup API should use a Producer instance to put items onto the queue.

Returns the Message that was put onto the Queue



52
53
54
55
# File 'lib/qup/adapter/kestrel/queue.rb', line 52

def produce( message )
  @client.set( @name, message )
  return ::Qup::Message.new( message.object_id, message )
end