Module: RudeQ::ClassMethods
- Defined in:
- lib/rude_q.rb
Instance Method Summary collapse
-
#backlog(queue_name = nil) ⇒ Object
A snapshot count of unprocessed items for the given
queue_name
. -
#cleanup!(expiry = 1.hour) ⇒ Object
Cleanup old processed items.
-
#data ⇒ Object
:nodoc:.
-
#data=(value) ⇒ Object
:nodoc:.
-
#fetch(queue_name, &block) ⇒ Object
Grab the first item from the queue, and execute the supplied block if there is one - it will return the value of the block.
-
#fetch_with_lock(qname, &block) ⇒ Object
:nodoc:.
-
#get(queue_name) ⇒ Object
Grab the first item from the queue queue_name (strings and symbols are treated the same) - it should always come out the same as it went in - they should always come out in the same order they went in - it will return a nil if there is no unprocessed entry in the queue.
-
#queue_options ⇒ Object
configure your RudeQ ==== :processed - what do we do after retrieving a queue item? *
:set_flag
- set theprocessed
flag totrue
(keep data in the db) [default] *:destroy
- destroy the processed item (keep our queue as lean as possible. -
#set(queue_name, data) ⇒ Object
Add any serialize-able
data
to the queuequeue_name
(strings and symbols are treated the same) RudeQueue.set(:sausage_queue, Sausage.new(:sauce => “yummy”)) RudeQueue.set(“sausage_queue”, Sausage.new(:other => true)).
Instance Method Details
#backlog(queue_name = nil) ⇒ Object
A snapshot count of unprocessed items for the given queue_name
>> RudeQueue.backlog
-> 265
>> RudeQueue.backlog(:one_queue)
-> 212
>> RudeQueue.backlog(:another_queue)
-> 53
105 106 107 108 109 110 111 |
# File 'lib/rude_q.rb', line 105 def backlog(queue_name=nil) conditions = {:processed => false} if queue_name conditions[:queue_name] = sanitize_queue_name(queue_name) end self.count(:conditions => conditions) end |
#cleanup!(expiry = 1.hour) ⇒ Object
Cleanup old processed items
RudeQueue.cleanup!
RudeQueue.cleanup!(1.week)
31 32 33 |
# File 'lib/rude_q.rb', line 31 def cleanup!(expiry=1.hour) self.delete_all(["processed = ? AND updated_at < ?", true, expiry.to_i.ago]) end |
#data ⇒ Object
:nodoc:
148 149 150 |
# File 'lib/rude_q.rb', line 148 def data # :nodoc: YAML.load(self[:data]) end |
#data=(value) ⇒ Object
:nodoc:
151 152 153 |
# File 'lib/rude_q.rb', line 151 def data=(value) # :nodoc: self[:data] = YAML.dump(value) end |
#fetch(queue_name, &block) ⇒ Object
Grab the first item from the queue, and execute the supplied block if there is one
- it will return the value of the block
>> RudeQueue.fetch(:my_queue) do |data|
>> Monster.devour(data)
>> end
-> nil
>> status = RudeQueue.fetch(:my_queue) do |data|
>> process(data) # returns the value :update in this case
>> end
-> :update
>> status
-> :update
90 91 92 93 94 |
# File 'lib/rude_q.rb', line 90 def fetch(queue_name, &block) if data = get(queue_name) return block.call(data) end end |
#fetch_with_lock(qname, &block) ⇒ Object
:nodoc:
113 114 115 116 117 118 119 120 121 |
# File 'lib/rude_q.rb', line 113 def fetch_with_lock(qname, &block) # :nodoc: lock = case [:lock] when :pessimistic then RudeQ::PessimisticLock when :token then RudeQ::TokenLock else raise(ArgumentError, "bad queue_option for :lock - #{[:lock].inspect}") end lock.fetch_with_lock(self, qname, &block) end |
#get(queue_name) ⇒ Object
Grab the first item from the queue queue_name (strings and symbols are treated the same)
- it should always come out the same as it went in
- they should always come out in the same order they went in
- it will return a nil if there is no unprocessed entry in the queue
>> RudeQueue.get(21)
-> {:a => "hash"}
>> RudeQueue.get(:a_symbol)
-> 255
>> RudeQueue.get("a string")
-> nil
63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/rude_q.rb', line 63 def get(queue_name) qname = sanitize_queue_name(queue_name) fetch_with_lock(qname) do |record| if record processed!(record) return record.data else return nil # Starling waits indefinitely for a corresponding queue item end end end |
#queue_options ⇒ Object
configure your RudeQ
:processed - what do we do after retrieving a queue item?
-
:set_flag
- set theprocessed
flag totrue
(keep data in the db) [default] -
:destroy
- destroy the processed item (keep our queue as lean as possible
:lock - what locking method should we use?
-
:pessimistic
- RudeQ::PessimisticLock [default] -
:token
- RudeQ::TokenLock
144 145 146 |
# File 'lib/rude_q.rb', line 144 def @queue_options ||= {:processed => :set_flag, :lock => :pessimistic} end |
#set(queue_name, data) ⇒ Object
Add any serialize-able data
to the queue queue_name
(strings and symbols are treated the same)
RudeQueue.set(:sausage_queue, Sausage.new(:sauce => "yummy"))
RudeQueue.set("sausage_queue", Sausage.new(:other => true))
>> RudeQueue.get("sausage_queue")
-> *yummy sausage*
>> RudeQueue.get(:sausage_queue)
-> *other_sausage*
>> RudeQueue.get(:sausage_queue)
-> nil
45 46 47 48 49 50 |
# File 'lib/rude_q.rb', line 45 def set(queue_name, data) queue_name = sanitize_queue_name(queue_name) self.create!(:queue_name => queue_name, :data => data) return nil # in line with Starling end |