Class: Qup::Adapter::Maildir::Queue

Inherits:
Object
  • Object
show all
Includes:
QueueAPI
Defined in:
lib/qup/adapter/maildir/queue.rb

Overview

Internal: The Qup Implementation in the Maildir Adapter

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from QueueAPI

#consumer, #producer

Constructor Details

#initialize(root_path, name) ⇒ Queue

Internal: Create a new Queue

root_path - the root_path for this Queue to create under name - the String name of the Queue

Returns a new Queue.



19
20
21
22
23
24
# File 'lib/qup/adapter/maildir/queue.rb', line 19

def initialize( root_path, name )
  @root_path  = ::Pathname.new( root_path )
  @name       = name
  @queue_path = @root_path + @name
  @maildir    = ::Maildir.new( @queue_path, true )
end

Instance Attribute Details

#nameObject (readonly)

Internal: the name of the Queue



11
12
13
# File 'lib/qup/adapter/maildir/queue.rb', line 11

def name
  @name
end

Instance Method Details

#acknowledge(message) ⇒ Object

Internal: Acknowledge that message is completed and remove it from the Queue.

Returns nothing

Raises:



105
106
107
108
109
110
111
# File 'lib/qup/adapter/maildir/queue.rb', line 105

def acknowledge( message )
  md_message = @maildir.get( message.key )
  msg = "Message #{message.key} has not been processed yet"
  raise ::Qup::Error, msg unless md_message.dir == :cur
  raise ::Qup::Error, msg unless md_message.seen?
  md_message.destroy
end

#consume(&block) ⇒ Object

Internal: Retrieve a Message from the Queue

Yields a Message

A user of the Qup API should use a Consumer instance to retrieve items from the Queue.

Returns a Message



87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/qup/adapter/maildir/queue.rb', line 87

def consume(&block)
  msg = @maildir.list(:new, :limit => 1).first
  return nil if msg.nil?
  msg.process
  msg.seen!
  q_message = ::Qup::Message.new( msg.key, msg.data )
  if block_given? then
    yield_message( q_message, &block )
  else
    return q_message
  end
end

#depthObject

Internal: return the number of Messages on the Queue

Returns an integer of the Queue depth



51
52
53
54
55
56
57
58
59
# File 'lib/qup/adapter/maildir/queue.rb', line 51

def depth
  total = 0
  %w[ new cur ].each do |subdir|
    search_path = File.join( @maildir.path, subdir, '*' )
    keys = Dir.glob( search_path )
    total += keys.size
  end
  return total
end

#destroyObject

Internal: Remove the Queue from the system

Returns nothing.



30
31
32
# File 'lib/qup/adapter/maildir/queue.rb', line 30

def destroy
  @queue_path.rmtree
end

#flushObject

Internal: Remove all messages from the Queue

Returns nothing.



38
39
40
41
42
43
44
45
# File 'lib/qup/adapter/maildir/queue.rb', line 38

def flush
  ::Maildir::SUBDIRS.each do |sub|
    dir = Pathname.new( File.join( @maildir.path, sub.to_s ))
    dir.children.each do |p|
      p.delete if p.file?
    end
  end
end

#produce(message) ⇒ Object

Internal: Put an item onto the Queue

message - the data to put onto the queue.

The ‘message’ that is passed in is wrapped in a Qup::Message before being stored.

A user of the Qup API should use a Producer instance to put items onto the queue.

Returns the Message that was put onto the Queue



73
74
75
76
# File 'lib/qup/adapter/maildir/queue.rb', line 73

def produce( message )
  msg = @maildir.add( message )
  return ::Qup::Message.new( msg.key, msg.data )
end