Module: RudeQ::ClassMethods

Defined in:
lib/rude_q.rb

Instance Method Summary collapse

Instance Method Details

#backlog(queue_name = nil) ⇒ Object

A snapshot count of unprocessed items for the given queue_name

>> RudeQueue.backlog
-> 265
>> RudeQueue.backlog(:one_queue)
-> 212
>> RudeQueue.backlog(:another_queue)
-> 53


105
106
107
108
109
110
111
# File 'lib/rude_q.rb', line 105

def backlog(queue_name=nil)
  conditions = {:processed => false}
  if queue_name
    conditions[:queue_name] = sanitize_queue_name(queue_name)
  end
  self.count(:conditions => conditions)
end

#cleanup!(expiry = 1.hour) ⇒ Object

Cleanup old processed items

RudeQueue.cleanup!
RudeQueue.cleanup!(1.week)


31
32
33
# File 'lib/rude_q.rb', line 31

def cleanup!(expiry=1.hour)
  self.delete_all(["processed = ? AND updated_at < ?", true, expiry.to_i.ago])
end

#dataObject

:nodoc:



148
149
150
# File 'lib/rude_q.rb', line 148

def data # :nodoc:
  YAML.load(self[:data])
end

#data=(value) ⇒ Object

:nodoc:



151
152
153
# File 'lib/rude_q.rb', line 151

def data=(value) # :nodoc:
  self[:data] = YAML.dump(value)
end

#fetch(queue_name, &block) ⇒ Object

Grab the first item from the queue, and execute the supplied block if there is one

- it will return the value of the block

>> RudeQueue.fetch(:my_queue) do |data|
>>   Monster.devour(data)
>> end
-> nil

>> status = RudeQueue.fetch(:my_queue) do |data|
>>   process(data) # returns the value :update in this case
>> end
-> :update
>> status
-> :update


90
91
92
93
94
# File 'lib/rude_q.rb', line 90

def fetch(queue_name, &block)
  if data = get(queue_name)
    return block.call(data)
  end
end

#fetch_with_lock(qname, &block) ⇒ Object

:nodoc:



113
114
115
116
117
118
119
120
121
# File 'lib/rude_q.rb', line 113

def fetch_with_lock(qname, &block) # :nodoc:
  lock = case queue_options[:lock]
  when :pessimistic then RudeQ::PessimisticLock
  when :token       then RudeQ::TokenLock
  else
    raise(ArgumentError, "bad queue_option for :lock - #{queue_options[:lock].inspect}")
  end
  lock.fetch_with_lock(self, qname, &block)
end

#get(queue_name) ⇒ Object

Grab the first item from the queue queue_name (strings and symbols are treated the same)

- it should always come out the same as it went in
- they should always come out in the same order they went in
- it will return a nil if there is no unprocessed entry in the queue

>> RudeQueue.get(21)
-> {:a => "hash"}
>> RudeQueue.get(:a_symbol)
-> 255
>> RudeQueue.get("a string")
-> nil


63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/rude_q.rb', line 63

def get(queue_name)
  qname = sanitize_queue_name(queue_name)
  
  fetch_with_lock(qname) do |record|
    if record
      processed!(record)
      return record.data
    else
      return nil # Starling waits indefinitely for a corresponding queue item
    end
  end
end

#queue_optionsObject

configure your RudeQ

:processed - what do we do after retrieving a queue item?

  • :set_flag - set the processed flag to true (keep data in the db) [default]

  • :destroy - destroy the processed item (keep our queue as lean as possible

:lock - what locking method should we use?

  • :pessimistic - RudeQ::PessimisticLock [default]

  • :token - RudeQ::TokenLock



144
145
146
# File 'lib/rude_q.rb', line 144

def queue_options
  @queue_options ||= {:processed => :set_flag, :lock => :pessimistic}
end

#set(queue_name, data) ⇒ Object

Add any serialize-able data to the queue queue_name (strings and symbols are treated the same)

RudeQueue.set(:sausage_queue, Sausage.new(:sauce => "yummy"))
RudeQueue.set("sausage_queue", Sausage.new(:other => true))

>> RudeQueue.get("sausage_queue")
-> *yummy sausage*
>> RudeQueue.get(:sausage_queue)
-> *other_sausage*
>> RudeQueue.get(:sausage_queue)
-> nil


45
46
47
48
49
50
# File 'lib/rude_q.rb', line 45

def set(queue_name, data)
  queue_name = sanitize_queue_name(queue_name)

  self.create!(:queue_name => queue_name, :data => data)
  return nil # in line with Starling
end