Class: Qup::Adapter::Redis::Queue

Inherits:
Connection show all
Includes:
QueueAPI
Defined in:
lib/qup/adapter/redis/queue.rb

Overview

Internal: The Qup implementation for a Redis Queue

Instance Attribute Summary

Attributes inherited from Connection

#name

Instance Method Summary collapse

Methods included from QueueAPI

#consumer, #name, #producer

Constructor Details

#initialize(uri, name) ⇒ Queue

Internal: create a new Queue

uri - the connection uri for the Redis Client name - the String name of the Queue

Returns a new Queue.



16
17
18
19
# File 'lib/qup/adapter/redis/queue.rb', line 16

def initialize( uri, name )
  super
  @open_messages = {}
end

Instance Method Details

#acknowledge(message) ⇒ Object

Internal: Acknowledge that message is completed and remove it from the Queue.

In redis, this doesn’t do anything at all. The tracking is only performed to meet the API requirements.

Returns nothing.

Raises:



52
53
54
55
# File 'lib/qup/adapter/redis/queue.rb', line 52

def acknowledge( message )
  open_msg = @open_messages.delete( message.key )
  raise Qup::Error, "Message #{message.key} is not currently being consumed" unless open_msg
end

#consume(&block) ⇒ Object

Internal: Retrieve a Message from the Queue

Yields a Message

Returns a Message



75
76
77
78
79
80
81
82
83
84
# File 'lib/qup/adapter/redis/queue.rb', line 75

def consume(&block)
  queue_name, data = @client.brpop @name, 0 # blocking pop
  message = ::Qup::Message.new( data.object_id, data )
  @open_messages[message.key] = message
  if block_given? then
    yield_message( message, &block )
  else
    return message
  end
end

#depthObject

Internal: return the number of Messages on the Queue

Returns an integer of the Queue depth



41
42
43
# File 'lib/qup/adapter/redis/queue.rb', line 41

def depth
  @client.llen name
end

#destroyObject Also known as: flush

Internal: Destroy the queue

Removes the list from redis.

Returns nothing.



26
27
28
# File 'lib/qup/adapter/redis/queue.rb', line 26

def destroy
  @client.del name
end

#produce(message) ⇒ Object

Internal: Put an item onto the Queue

message - the data to put onto the queue.

The ‘message’ that is passed in is wrapped in a Qup::Message before being stored.

Returns the Message that was put onto the Queue



65
66
67
68
# File 'lib/qup/adapter/redis/queue.rb', line 65

def produce( message )
  @client.lpush name, message
  return ::Qup::Message.new( message.object_id, message )
end