Class: Chimp::ExecutionGroup
- Inherits:
-
Object
- Object
- Chimp::ExecutionGroup
- Defined in:
- lib/right_chimp/queue/execution_group.rb
Overview
An ExecutionGroup contains a set of Executors to be processed
Only the subclasses SerialExecutionGroup and ParallelExecutionGroup should be used directly.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#concurrency ⇒ Object
Returns the value of attribute concurrency.
-
#description ⇒ Object
Returns the value of attribute description.
-
#group_id ⇒ Object
Returns the value of attribute group_id.
-
#started ⇒ Object
Returns the value of attribute started.
-
#time_end ⇒ Object
readonly
Returns the value of attribute time_end.
-
#time_start ⇒ Object
readonly
Returns the value of attribute time_start.
Instance Method Summary collapse
-
#cancel(id) ⇒ Object
Cancel a job by id.
-
#done? ⇒ Boolean
An execution group is “done” if nothing is queued or running and at least one job has completed.
-
#get_job(i) ⇒ Object
Get a particular job.
-
#get_job_ids ⇒ Object
Get all job ids.
-
#get_jobs ⇒ Object
Get all jobs.
-
#get_jobs_by_status(status) ⇒ Object
Get jobs by status.
-
#get_total_exec_time ⇒ Object
Return total execution time.
-
#initialize(new_group_id = nil) ⇒ ExecutionGroup
constructor
A new instance of ExecutionGroup.
- #job_completed ⇒ Object
-
#push(j) ⇒ Object
Add something to the work queue.
-
#queue(id) ⇒ Object
Queue a held job by id.
-
#ready? ⇒ Boolean
An execution group is “ready” if it has work that can be done; see implementation in child classes.
-
#requeue(id) ⇒ Object
Requeue a job by id.
-
#requeue_failed_jobs! ⇒ Object
Requeue all failed jobs.
-
#reset! ⇒ Object
Reset the queue.
-
#results ⇒ Object
Return a hash of the results.
-
#running? ⇒ Boolean
Is this execution group running anything?.
-
#set_jobs(jobs = []) ⇒ Object
Reset all jobs and bulk set them.
-
#shift ⇒ Object
Take something from the queue.
-
#size ⇒ Object
Size of the active queue.
-
#sort! ⇒ Object
Sort queue by server nickname.
-
#to_s ⇒ Object
Print out ExecutionGroup information.
Constructor Details
#initialize(new_group_id = nil) ⇒ ExecutionGroup
Returns a new instance of ExecutionGroup.
28 29 30 31 32 33 34 35 36 37 |
# File 'lib/right_chimp/queue/execution_group.rb', line 28 def initialize(new_group_id=nil) @group_id = new_group_id @queue = [] @jobs_by_id = {} @log = nil @time_start = nil @time_end = nil @concurrency = 1 @started = 0 end |
Instance Attribute Details
#concurrency ⇒ Object
Returns the value of attribute concurrency.
25 26 27 |
# File 'lib/right_chimp/queue/execution_group.rb', line 25 def concurrency @concurrency end |
#description ⇒ Object
Returns the value of attribute description.
25 26 27 |
# File 'lib/right_chimp/queue/execution_group.rb', line 25 def description @description end |
#group_id ⇒ Object
Returns the value of attribute group_id.
25 26 27 |
# File 'lib/right_chimp/queue/execution_group.rb', line 25 def group_id @group_id end |
#started ⇒ Object
Returns the value of attribute started.
25 26 27 |
# File 'lib/right_chimp/queue/execution_group.rb', line 25 def started @started end |
#time_end ⇒ Object (readonly)
Returns the value of attribute time_end.
26 27 28 |
# File 'lib/right_chimp/queue/execution_group.rb', line 26 def time_end @time_end end |
#time_start ⇒ Object (readonly)
Returns the value of attribute time_start.
26 27 28 |
# File 'lib/right_chimp/queue/execution_group.rb', line 26 def time_start @time_start end |
Instance Method Details
#cancel(id) ⇒ Object
Cancel a job by id
228 229 230 231 232 233 234 235 |
# File 'lib/right_chimp/queue/execution_group.rb', line 228 def cancel(id) Log.warn "Cancelling job id #{id}" job = @jobs_by_id[id] job.status = Executor::STATUS_ERROR job.owner = nil job.time_end = Time.now @queue.delete(job) end |
#done? ⇒ Boolean
An execution group is “done” if nothing is queued or running and at least one job has completed.
172 173 174 175 176 177 178 |
# File 'lib/right_chimp/queue/execution_group.rb', line 172 def done? return ( get_jobs_by_status(Executor::STATUS_NONE).size == 0 && get_jobs_by_status(Executor::STATUS_RUNNING).size == 0 && get_jobs_by_status(Executor::STATUS_DONE).size > 0 ) end |
#get_job(i) ⇒ Object
Get a particular job
131 132 133 |
# File 'lib/right_chimp/queue/execution_group.rb', line 131 def get_job(i) @jobs_by_id[i] end |
#get_job_ids ⇒ Object
Get all job ids
124 125 126 |
# File 'lib/right_chimp/queue/execution_group.rb', line 124 def get_job_ids @jobs_by_id.keys end |
#get_jobs ⇒ Object
Get all jobs
117 118 119 |
# File 'lib/right_chimp/queue/execution_group.rb', line 117 def get_jobs @jobs_by_id.values end |
#get_jobs_by_status(status) ⇒ Object
Get jobs by status
138 139 140 141 142 143 144 |
# File 'lib/right_chimp/queue/execution_group.rb', line 138 def get_jobs_by_status(status) r = [] @jobs_by_id.values.each do |i| r << i if i.status == status.to_sym || status.to_sym == :all end return r end |
#get_total_exec_time ⇒ Object
Return total execution time
240 241 242 243 244 245 246 247 248 |
# File 'lib/right_chimp/queue/execution_group.rb', line 240 def get_total_exec_time if @time_start == nil return 0 elsif @time_end == nil return Time.now.to_i - @time_start.to_i else return @time_end.to_i- @time_start.to_i end end |
#job_completed ⇒ Object
146 147 148 |
# File 'lib/right_chimp/queue/execution_group.rb', line 146 def job_completed @time_end = Time.now end |
#push(j) ⇒ Object
Add something to the work queue
42 43 44 45 46 47 48 |
# File 'lib/right_chimp/queue/execution_group.rb', line 42 def push(j) raise "invalid work" if j == nil j.job_id = IDManager.get if j.job_id == nil j.group = self @queue.push(j) @jobs_by_id[j.job_id] = j end |
#queue(id) ⇒ Object
Queue a held job by id
202 203 204 205 206 207 208 209 210 |
# File 'lib/right_chimp/queue/execution_group.rb', line 202 def queue(id) Log.debug "Queuing held job id #{id}" job = @jobs_by_id[id] job.owner = nil job.time_start = Time.now job.time_end = nil job.status = Executor::STATUS_NONE self.push(job) end |
#ready? ⇒ Boolean
An execution group is “ready” if it has work that can be done; see implementation in child classes.
164 165 166 |
# File 'lib/right_chimp/queue/execution_group.rb', line 164 def ready? raise "unimplemented" end |
#requeue(id) ⇒ Object
Requeue a job by id
215 216 217 218 219 220 221 222 223 |
# File 'lib/right_chimp/queue/execution_group.rb', line 215 def requeue(id) Log.debug "Requeuing job id #{id}" job = @jobs_by_id[id] job.status = Executor::STATUS_NONE job.owner = nil job.time_start = Time.now job.time_end = nil self.push(job) end |
#requeue_failed_jobs! ⇒ Object
Requeue all failed jobs
193 194 195 196 197 |
# File 'lib/right_chimp/queue/execution_group.rb', line 193 def requeue_failed_jobs! get_jobs_by_status(Executor::STATUS_ERROR).each do |job| requeue(job.job_id) end end |
#reset! ⇒ Object
Reset the queue
110 111 112 |
# File 'lib/right_chimp/queue/execution_group.rb', line 110 def reset! @queue = [] end |
#results ⇒ Object
Return a hash of the results
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/right_chimp/queue/execution_group.rb', line 71 def results return self.get_jobs.map do |task| next if task == nil next if task.server == nil { :job_id => task.job_id, :name => task.info[0], :host => task.server.name, :status => task.status, :error => task.error, :total => self.get_total_execution_time(task.status, task.time_start, task.time_end), :start => task.time_start, :end => task.time_end, :worker => task } end end |
#running? ⇒ Boolean
Is this execution group running anything?
183 184 185 186 187 188 |
# File 'lib/right_chimp/queue/execution_group.rb', line 183 def running? total_jobs_running = get_jobs_by_status(Executor::STATUS_NONE).size + get_jobs_by_status(Executor::STATUS_RUNNING).size + get_jobs_by_status(Executor::STATUS_RETRYING).size (total_jobs_running > 0) end |
#set_jobs(jobs = []) ⇒ Object
Reset all jobs and bulk set them
153 154 155 156 157 158 |
# File 'lib/right_chimp/queue/execution_group.rb', line 153 def set_jobs(jobs=[]) self.reset! jobs.each do |job| self.push(job) end end |
#shift ⇒ Object
Take something from the queue
53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/right_chimp/queue/execution_group.rb', line 53 def shift updated_queue = [] found_job = nil @queue.each do |job| if found_job || job.status == Executor::STATUS_HOLDING updated_queue.push(job) elsif job.status == Executor::STATUS_NONE found_job = job end end @queue = updated_queue @time_start = Time.now if @time_start == nil return found_job end |
#size ⇒ Object
Size of the active queue
92 93 94 |
# File 'lib/right_chimp/queue/execution_group.rb', line 92 def size return @queue.size end |
#sort! ⇒ Object
Sort queue by server nickname
99 100 101 102 103 104 105 |
# File 'lib/right_chimp/queue/execution_group.rb', line 99 def sort! if @queue != nil @queue.sort! do |a,b| a.server.nickname <=> b.server.nickname end end end |
#to_s ⇒ Object
Print out ExecutionGroup information
253 254 255 |
# File 'lib/right_chimp/queue/execution_group.rb', line 253 def to_s return "#{self.class}[#{group_id}]: ready=#{self.ready?} total_jobs=#{@jobs_by_id.size} queued_jobs=#{self.size}" end |