Class: Chimp::ChimpQueue
- Inherits:
-
Object
- Object
- Chimp::ChimpQueue
- Includes:
- Singleton
- Defined in:
- lib/right_chimp/queue/chimp_queue.rb
Overview
The ChimpQueue is a singleton that contains the chimp work queue
Instance Attribute Summary collapse
-
#delay ⇒ Object
Returns the value of attribute delay.
-
#group ⇒ Object
Returns the value of attribute group.
-
#max_threads ⇒ Object
Returns the value of attribute max_threads.
-
#processing ⇒ Object
Returns the value of attribute processing.
-
#retry_count ⇒ Object
Returns the value of attribute retry_count.
Class Method Summary collapse
-
.[](group) ⇒ Object
Allow the groups to be accessed as ChimpQueue.group.
- .[]=(k, v) ⇒ Object
Instance Method Summary collapse
- #create_group(name, type = :parallel, concurrency = 1) ⇒ Object
- #get_job(id) ⇒ Object
- #get_jobs ⇒ Object
-
#get_jobs_by_status(status) ⇒ Object
Return an array of all jobs with the requested status.
- #get_jobs_by_uuid(uuid) ⇒ Object
-
#initialize ⇒ ChimpQueue
constructor
A new instance of ChimpQueue.
-
#push(g, w) ⇒ Object
Push a task into the queue.
-
#quit ⇒ Object
Quit - empty the queue and wait for remaining jobs to complete.
-
#reset! ⇒ Object
Reset the queue and the :default group.
-
#run_threads ⇒ Object
Run all threads forever (used by chimpd).
-
#shift ⇒ Object
Grab the oldest work item available.
-
#size ⇒ Object
return the total number of queued (non-executing) objects.
-
#start ⇒ Object
Start up queue runners.
-
#wait_until_done(g, &block) ⇒ Object
Wait until a group is done.
Constructor Details
#initialize ⇒ ChimpQueue
Returns a new instance of ChimpQueue.
11 12 13 14 15 16 17 18 19 20 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 11 def initialize @delay = 0 @retry_count = 0 @max_threads = 10 @workers_never_exit = true @threads = [] @semaphore = Mutex.new @processing = {} self.reset! end |
Instance Attribute Details
#delay ⇒ Object
Returns the value of attribute delay.
9 10 11 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 9 def delay @delay end |
#group ⇒ Object
Returns the value of attribute group.
9 10 11 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 9 def group @group end |
#max_threads ⇒ Object
Returns the value of attribute max_threads.
9 10 11 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 9 def max_threads @max_threads end |
#processing ⇒ Object
Returns the value of attribute processing.
9 10 11 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 9 def processing @processing end |
#retry_count ⇒ Object
Returns the value of attribute retry_count.
9 10 11 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 9 def retry_count @retry_count end |
Class Method Details
.[](group) ⇒ Object
Allow the groups to be accessed as ChimpQueue.group
139 140 141 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 139 def self.[](group) return ChimpQueue.instance.group[group] end |
.[]=(k, v) ⇒ Object
143 144 145 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 143 def self.[]=(k,v) ChimpQueue.instance.group[k] = v end |
Instance Method Details
#create_group(name, type = :parallel, concurrency = 1) ⇒ Object
58 59 60 61 62 63 64 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 58 def create_group(name, type = :parallel, concurrency = 1) Log.debug "Creating new execution group #{name} type=#{type} concurrency=#{concurrency}" new_group = ExecutionGroupFactory.from_type(type) new_group.group_id = name new_group.concurrency = concurrency ChimpQueue[name] = new_group end |
#get_job(id) ⇒ Object
163 164 165 166 167 168 169 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 163 def get_job(id) jobs = self.get_jobs jobs.each do |j| return j if j.job_id == id.to_i end end |
#get_jobs ⇒ Object
171 172 173 174 175 176 177 178 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 171 def get_jobs r = [] @group.values.each do |group| group.get_jobs.each { |job| r << job } end r end |
#get_jobs_by_status(status) ⇒ Object
Return an array of all jobs with the requested status.
151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 151 def get_jobs_by_status(status) r = [] @group.values.each do |group| v = group.get_jobs_by_status(status) if v != nil and v != [] r += v end end return r end |
#get_jobs_by_uuid(uuid) ⇒ Object
180 181 182 183 184 185 186 187 188 189 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 180 def get_jobs_by_uuid(uuid) r = [] jobs = self.get_jobs jobs.each do |j| r << j if j.job_uuid == uuid end r end |
#push(g, w) ⇒ Object
Push a task into the queue
51 52 53 54 55 56 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 51 def push(g, w) raise "no group specified" unless g create_group(g) if not ChimpQueue[g] ChimpQueue[g].push(w) unless ChimpQueue[g].get_job(w.job_id) end |
#quit ⇒ Object
Quit - empty the queue and wait for remaining jobs to complete
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 98 def quit i = 0 @group.keys.each do |group| wait_until_done(group) do if i < 30 sleep 1 i += 1 print "." else break end end end @threads.each { |t| t.kill } puts " done." end |
#reset! ⇒ Object
Reset the queue and the :default group
This doesn’t do anything to the groups’s jobs
27 28 29 30 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 27 def reset! @group = {} @group[:default] = ParallelExecutionGroup.new(:default) end |
#run_threads ⇒ Object
Run all threads forever (used by chimpd)
119 120 121 122 123 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 119 def run_threads @threads.each do |t| t.join(5) end end |
#shift ⇒ Object
Grab the oldest work item available
69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 69 def shift r = nil @semaphore.synchronize do @group.values.each do |group| if group.ready? r = group.shift Log.debug "Shifting job '#{r.job_id}' from group '#{group.group_id}'" unless r.nil? break end end end return(r) end |
#size ⇒ Object
return the total number of queued (non-executing) objects
128 129 130 131 132 133 134 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 128 def size s = 0 @group.values.each do |group| s += group.size end return(s) end |
#start ⇒ Object
Start up queue runners
35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 35 def start self.sort_queues! for i in (1..max_threads) @threads << Thread.new(i) do worker = QueueWorker.new worker.delay = @delay worker.retry_count = @retry_count worker.run end end end |
#wait_until_done(g, &block) ⇒ Object
Wait until a group is done
86 87 88 89 90 91 92 93 |
# File 'lib/right_chimp/queue/chimp_queue.rb', line 86 def wait_until_done(g, &block) while @group[g].running? @threads.each do |t| t.join(1) yield end end end |