Class: Edamame::PersistentQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/edamame/persistent_queue.rb

Direct Known Subclasses

Broker

Constant Summary collapse

DEFAULT_OPTIONS =
{
  :queue => { :type => :beanstalk_queue, :uris => ['localhost:11100'] },
  :store => { :type => :tyrant_store,    :uri  =>           ':11101'  }
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(_options = {}) ⇒ PersistentQueue

Create a PersistentQueue with options

Parameters:

  • options (Hash)

    the options to create a message with.



25
26
27
28
29
30
# File 'lib/edamame/persistent_queue.rb', line 25

def initialize _options={}
  @options = PersistentQueue::DEFAULT_OPTIONS.deep_merge(_options)
  @tube    = options[:tube] || :default
  @store   = Edamame::Store.create options[:store]
  @queue   = Edamame::Queue.create options[:queue].merge(:default_tube => @tube)
end

Instance Attribute Details

#optionsObject (readonly)

Hash of options used to create this Queue. Don’t mess with this after creating the object – it will be futile, at best.



9
10
11
# File 'lib/edamame/persistent_queue.rb', line 9

def options
  @options
end

#queueObject (readonly)

The priority queue to use, probably a Edamame::Queue



16
17
18
# File 'lib/edamame/persistent_queue.rb', line 16

def queue
  @queue
end

#storeObject (readonly)

The database backing store to use, probably a Edamame::Store



14
15
16
# File 'lib/edamame/persistent_queue.rb', line 14

def store
  @store
end

#tubeObject

The default tube for the transient queue Tube name must be purely alphanumeric



12
13
14
# File 'lib/edamame/persistent_queue.rb', line 12

def tube
  @tube
end

Instance Method Details

#<<(job) ⇒ Object

Alias for put(job)



43
44
45
# File 'lib/edamame/persistent_queue.rb', line 43

def << job
  put job
end

#delete(job) ⇒ Object

Remove the job from the queue.



74
75
76
77
# File 'lib/edamame/persistent_queue.rb', line 74

def delete job
  store.delete job.key
  queue.delete job.qjob
end

#each(klass = nil, &block) ⇒ Object

Returns each job as it appears in the queue.

all jobs – active, inactive, running, etc – are returned, and in some arbitrary order.



96
97
98
99
100
101
# File 'lib/edamame/persistent_queue.rb', line 96

def each klass=nil, &block
  klass ||= Edamame::Job
  store.each_as(klass) do |key, job|
    yield job
  end
end

#get(key, klass = nil) ⇒ Object

Retrieve named record



55
56
57
58
59
# File 'lib/edamame/persistent_queue.rb', line 55

def get key, klass=nil
  klass ||= Edamame::Job
  hsh = store.get(key) or return
  klass.from_hash hsh
end

#load(&block) ⇒ Object

Loads all jobs from the backing store into the queue.



106
107
108
109
110
111
112
113
114
# File 'lib/edamame/persistent_queue.rb', line 106

def load &block
  hoard do |job|
    yield(job) if block
    unless store.include?(job.key)
      warn "Missing job: #{job.inspect}"
    end
  end
  unhoard &block
end

#put(job, *args) ⇒ Object

Add a new Job to the queue



35
36
37
38
39
40
41
# File 'lib/edamame/persistent_queue.rb', line 35

def put job, *args
  job.tube  = self.tube if job.tube.blank?
  self.tube = job.tube
  return if store.include?(job.key)
  store.save job
  queue.put  job, *args
end

#release(job) ⇒ Object

Returns the job to the queue, to be re-run later.

release’ing a job acknowledges it was completed, successfully or not



84
85
86
87
88
# File 'lib/edamame/persistent_queue.rb', line 84

def release job
  job.update!
  store.save    job
  queue.release job.qjob, job.priority, job.scheduling.delay
end

#reserve(timeout = nil, klass = nil) ⇒ Object

Request a job fom the queue for processing



64
65
66
67
68
69
# File 'lib/edamame/persistent_queue.rb', line 64

def reserve timeout=nil, klass=nil
  qjob     = queue.reserve(timeout) or return
  job      = get(qjob.key, klass)   or return
  job.qjob = qjob
  job
end

#statsObject

Returns a hash of stats about the store and queue



117
118
119
120
121
# File 'lib/edamame/persistent_queue.rb', line 117

def stats
  { :store_stats => store.stats,
    :queue_stats => queue.stats,
    :tube        => self.tube }
end