Class: Edamame::PersistentQueue
- Inherits:
-
Object
- Object
- Edamame::PersistentQueue
- Defined in:
- lib/edamame/persistent_queue.rb
Direct Known Subclasses
Constant Summary collapse
- DEFAULT_OPTIONS =
{ :queue => { :type => :beanstalk_queue, :uris => ['localhost:11100'] }, :store => { :type => :tyrant_store, :uri => ':11101' } }
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Hash of options used to create this Queue.
-
#queue ⇒ Object
readonly
The priority queue to use, probably a Edamame::Queue.
-
#store ⇒ Object
readonly
The database backing store to use, probably a Edamame::Store.
-
#tube ⇒ Object
The default tube for the transient queue Tube name must be purely alphanumeric.
Instance Method Summary collapse
-
#<<(job) ⇒ Object
Alias for put(job).
-
#delete(job) ⇒ Object
Remove the job from the queue.
-
#each(klass = nil, &block) ⇒ Object
Returns each job as it appears in the queue.
-
#get(key, klass = nil) ⇒ Object
Retrieve named record.
-
#initialize(_options = {}) ⇒ PersistentQueue
constructor
Create a PersistentQueue with options.
-
#load(&block) ⇒ Object
Loads all jobs from the backing store into the queue.
-
#put(job, *args) ⇒ Object
Add a new Job to the queue.
-
#release(job) ⇒ Object
Returns the job to the queue, to be re-run later.
-
#reserve(timeout = nil, klass = nil) ⇒ Object
Request a job fom the queue for processing.
-
#stats ⇒ Object
Returns a hash of stats about the store and queue.
Constructor Details
#initialize(_options = {}) ⇒ PersistentQueue
Create a PersistentQueue with options
25 26 27 28 29 30 |
# File 'lib/edamame/persistent_queue.rb', line 25 def initialize ={} @options = PersistentQueue::DEFAULT_OPTIONS.deep_merge() @tube = [:tube] || :default @store = Edamame::Store.create [:store] @queue = Edamame::Queue.create [:queue].merge(:default_tube => @tube) end |
Instance Attribute Details
#options ⇒ Object (readonly)
Hash of options used to create this Queue. Don’t mess with this after creating the object – it will be futile, at best.
9 10 11 |
# File 'lib/edamame/persistent_queue.rb', line 9 def @options end |
#queue ⇒ Object (readonly)
The priority queue to use, probably a Edamame::Queue
16 17 18 |
# File 'lib/edamame/persistent_queue.rb', line 16 def queue @queue end |
#store ⇒ Object (readonly)
The database backing store to use, probably a Edamame::Store
14 15 16 |
# File 'lib/edamame/persistent_queue.rb', line 14 def store @store end |
#tube ⇒ Object
The default tube for the transient queue Tube name must be purely alphanumeric
12 13 14 |
# File 'lib/edamame/persistent_queue.rb', line 12 def tube @tube end |
Instance Method Details
#<<(job) ⇒ Object
Alias for put(job)
43 44 45 |
# File 'lib/edamame/persistent_queue.rb', line 43 def << job put job end |
#delete(job) ⇒ Object
Remove the job from the queue.
74 75 76 77 |
# File 'lib/edamame/persistent_queue.rb', line 74 def delete job store.delete job.key queue.delete job.qjob end |
#each(klass = nil, &block) ⇒ Object
Returns each job as it appears in the queue.
all jobs – active, inactive, running, etc – are returned, and in some arbitrary order.
96 97 98 99 100 101 |
# File 'lib/edamame/persistent_queue.rb', line 96 def each klass=nil, &block klass ||= Edamame::Job store.each_as(klass) do |key, job| yield job end end |
#get(key, klass = nil) ⇒ Object
Retrieve named record
55 56 57 58 59 |
# File 'lib/edamame/persistent_queue.rb', line 55 def get key, klass=nil klass ||= Edamame::Job hsh = store.get(key) or return klass.from_hash hsh end |
#load(&block) ⇒ Object
Loads all jobs from the backing store into the queue.
106 107 108 109 110 111 112 113 114 |
# File 'lib/edamame/persistent_queue.rb', line 106 def load &block hoard do |job| yield(job) if block unless store.include?(job.key) warn "Missing job: #{job.inspect}" end end unhoard &block end |
#put(job, *args) ⇒ Object
Add a new Job to the queue
35 36 37 38 39 40 41 |
# File 'lib/edamame/persistent_queue.rb', line 35 def put job, *args job.tube = self.tube if job.tube.blank? self.tube = job.tube return if store.include?(job.key) store.save job queue.put job, *args end |
#release(job) ⇒ Object
Returns the job to the queue, to be re-run later.
release’ing a job acknowledges it was completed, successfully or not
84 85 86 87 88 |
# File 'lib/edamame/persistent_queue.rb', line 84 def release job job.update! store.save job queue.release job.qjob, job.priority, job.scheduling.delay end |
#reserve(timeout = nil, klass = nil) ⇒ Object
Request a job fom the queue for processing
64 65 66 67 68 69 |
# File 'lib/edamame/persistent_queue.rb', line 64 def reserve timeout=nil, klass=nil qjob = queue.reserve(timeout) or return job = get(qjob.key, klass) or return job.qjob = qjob job end |
#stats ⇒ Object
Returns a hash of stats about the store and queue
117 118 119 120 121 |
# File 'lib/edamame/persistent_queue.rb', line 117 def stats { :store_stats => store.stats, :queue_stats => queue.stats, :tube => self.tube } end |