Class: Qup::Adapter::Maildir::Queue
- Inherits:
-
Object
- Object
- Qup::Adapter::Maildir::Queue
- Includes:
- QueueAPI
- Defined in:
- lib/qup/adapter/maildir/queue.rb
Overview
Internal: The Qup Implementation in the Maildir Adapter
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Internal: the name of the Queue.
Instance Method Summary collapse
-
#acknowledge(message) ⇒ Object
Internal: Acknowledge that message is completed and remove it from the Queue.
-
#consume(&block) ⇒ Object
Internal: Retrieve a Message from the Queue.
-
#depth ⇒ Object
Internal: return the number of Messages on the Queue.
-
#destroy ⇒ Object
Internal: Remove the Queue from the system.
-
#flush ⇒ Object
Internal: Remove all messages from the Queue.
-
#initialize(root_path, name) ⇒ Queue
constructor
Internal: Create a new Queue.
-
#produce(message) ⇒ Object
Internal: Put an item onto the Queue.
Methods included from QueueAPI
Constructor Details
#initialize(root_path, name) ⇒ Queue
Internal: Create a new Queue
root_path - the root_path for this Queue to create under name - the String name of the Queue
Returns a new Queue.
19 20 21 22 23 24 |
# File 'lib/qup/adapter/maildir/queue.rb', line 19 def initialize( root_path, name ) @root_path = ::Pathname.new( root_path ) @name = name @queue_path = @root_path + @name @maildir = ::Maildir.new( @queue_path, true ) end |
Instance Attribute Details
#name ⇒ Object (readonly)
Internal: the name of the Queue
11 12 13 |
# File 'lib/qup/adapter/maildir/queue.rb', line 11 def name @name end |
Instance Method Details
#acknowledge(message) ⇒ Object
Internal: Acknowledge that message is completed and remove it from the Queue.
Returns nothing
105 106 107 108 109 110 111 |
# File 'lib/qup/adapter/maildir/queue.rb', line 105 def acknowledge( ) = @maildir.get( .key ) msg = "Message #{.key} has not been processed yet" raise ::Qup::Error, msg unless .dir == :cur raise ::Qup::Error, msg unless .seen? .destroy end |
#consume(&block) ⇒ Object
Internal: Retrieve a Message from the Queue
Yields a Message
A user of the Qup API should use a Consumer instance to retrieve items from the Queue.
Returns a Message
87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/qup/adapter/maildir/queue.rb', line 87 def consume(&block) msg = @maildir.list(:new, :limit => 1).first return nil if msg.nil? msg.process msg.seen! = ::Qup::Message.new( msg.key, msg.data ) if block_given? then ( , &block ) else return end end |
#depth ⇒ Object
Internal: return the number of Messages on the Queue
Returns an integer of the Queue depth
51 52 53 54 55 56 57 58 59 |
# File 'lib/qup/adapter/maildir/queue.rb', line 51 def depth total = 0 %w[ new cur ].each do |subdir| search_path = File.join( @maildir.path, subdir, '*' ) keys = Dir.glob( search_path ) total += keys.size end return total end |
#destroy ⇒ Object
Internal: Remove the Queue from the system
Returns nothing.
30 31 32 |
# File 'lib/qup/adapter/maildir/queue.rb', line 30 def destroy @queue_path.rmtree end |
#flush ⇒ Object
Internal: Remove all messages from the Queue
Returns nothing.
38 39 40 41 42 43 44 45 |
# File 'lib/qup/adapter/maildir/queue.rb', line 38 def flush ::Maildir::SUBDIRS.each do |sub| dir = Pathname.new( File.join( @maildir.path, sub.to_s )) dir.children.each do |p| p.delete if p.file? end end end |
#produce(message) ⇒ Object
Internal: Put an item onto the Queue
message - the data to put onto the queue.
The ‘message’ that is passed in is wrapped in a Qup::Message before being stored.
A user of the Qup API should use a Producer instance to put items onto the queue.
Returns the Message that was put onto the Queue
73 74 75 76 |
# File 'lib/qup/adapter/maildir/queue.rb', line 73 def produce( ) msg = @maildir.add( ) return ::Qup::Message.new( msg.key, msg.data ) end |