Class: JobBoss::Job
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- JobBoss::Job
- Defined in:
- lib/job_boss/job.rb
Class Method Summary collapse
- .call_path(path, *args) ⇒ Object
- .cancel(jobs = nil) ⇒ Object
- .cancelled? ⇒ Boolean
-
.completed_percent ⇒ Object
Returns the completion percentage of a set of jobs.
-
.delete_jobs_before(time) ⇒ Object
Given a time object Delete all jobs which were completed earlier than that time.
- .get_jobs(jobs) ⇒ Object
-
.result_hash(jobs = nil) ⇒ Object
Given a job or an array of jobs Returns a hash where the keys are the job method arguments and the values are the results of the job processing.
-
.time_taken ⇒ Object
How long did have set of jobs taken? Returns nil if not all jobs are complete.
-
.wait_for_jobs(jobs = nil, sleep_interval = 0.5) ⇒ Object
Given a job or an array of jobs Will cause the process to sleep until all specified jobs have completed sleep_interval specifies polling period.
Instance Method Summary collapse
-
#assigned? ⇒ Boolean
Has the job been assigned to an employee?.
- #batch ⇒ Object
-
#cancel ⇒ Object
Mark the job as cancelled so that the boss won’t run the job and so that the employee running the job gets stopped (if it’s been dispatched).
-
#cancelled? ⇒ Boolean
Has the job been cancelled?.
-
#completed? ⇒ Boolean
Is the job complete?.
-
#dispatch(boss) ⇒ Object
Method used by the boss to dispatch an employee.
-
#error ⇒ Object
If the job raised an exception, this method will return the instance of that exception with the message and backtrace.
- #mark_as_mia ⇒ Object
- #mark_as_started ⇒ Object
-
#mark_for_redo ⇒ Object
Clear out the job and put it back onto the queue for processing.
- #mia? ⇒ Boolean
- #prototype ⇒ Object
- #result ⇒ Object
-
#result=(value) ⇒ Object
Store result as first and only value of an array so that the value always gets serialized Was having issues with the boolean value of false getting stored as the string “f”.
-
#running? ⇒ Boolean
Is the job running?.
-
#succeeded? ⇒ Boolean
Did the job succeed?.
-
#time_taken ⇒ Object
How long did the job take?.
Class Method Details
.call_path(path, *args) ⇒ Object
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'lib/job_boss/job.rb', line 265 def call_path(path, *args) require 'active_support' raise ArgumentError, "Invalid path (must have #)" unless path.match(/.#./) controller, action = path.split('#') controller_objects = [] begin controller_objects << Kernel.const_get("#{controller.classify}Jobs").new rescue NameError end begin controller_objects << Kernel.const_get("#{controller.classify}") rescue NameError end raise ArgumentError, "Invalid controller: #{controller}" if controller_objects.empty? controller_object = controller_objects.detect {|controller_object| controller_object.respond_to?(action) } raise ArgumentError, "Invalid path action: #{action}" if !controller_object controller_object.send(action, *args) end |
.cancel(jobs = nil) ⇒ Object
219 220 221 222 223 |
# File 'lib/job_boss/job.rb', line 219 def cancel(jobs = nil) jobs = get_jobs(jobs) self.all.each(&:cancel) end |
.cancelled? ⇒ Boolean
215 216 217 |
# File 'lib/job_boss/job.rb', line 215 def cancelled? !!self.where('cancelled_at IS NOT NULL').first end |
.completed_percent ⇒ Object
Returns the completion percentage of a set of jobs
156 157 158 |
# File 'lib/job_boss/job.rb', line 156 def self.completed_percent self.completed.count.to_f / self.count.to_f end |
.delete_jobs_before(time) ⇒ Object
Given a time object Delete all jobs which were completed earlier than that time
233 234 235 |
# File 'lib/job_boss/job.rb', line 233 def delete_jobs_before(time) completed.where('completed_at < ?', time).delete_all end |
.get_jobs(jobs) ⇒ Object
225 226 227 228 229 |
# File 'lib/job_boss/job.rb', line 225 def get_jobs(jobs) jobs = [jobs] if jobs.is_a?(Job) jobs = self.scoped if jobs.nil? jobs end |
.result_hash(jobs = nil) ⇒ Object
Given a job or an array of jobs Returns a hash where the keys are the job method arguments and the values are the results of the job processing
202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/job_boss/job.rb', line 202 def result_hash(jobs = nil) jobs = get_jobs(jobs) # the #result method automatically reloads the result here if needed but this will # do it in one SQL call jobs = Job.find(jobs.collect(&:id)) require 'yaml' jobs.inject({}) do |hash, job| hash.merge(job.args => job.result) end end |
.time_taken ⇒ Object
How long did have set of jobs taken? Returns nil if not all jobs are complete
149 150 151 152 153 |
# File 'lib/job_boss/job.rb', line 149 def self.time_taken return nil if self.completed.count != self.count self.maximum(:completed_at) - self.minimum(:started_at) end |
.wait_for_jobs(jobs = nil, sleep_interval = 0.5) ⇒ Object
Given a job or an array of jobs Will cause the process to sleep until all specified jobs have completed sleep_interval specifies polling period
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/job_boss/job.rb', line 178 def wait_for_jobs(jobs = nil, sleep_interval = 0.5) at_exit do Job.not_completed.find(jobs).each(&:cancel) end jobs = get_jobs(jobs) ids = jobs.collect(&:id) Job.uncached do until Job.unfinished.find_all_by_id(ids).count == 0 sleep(sleep_interval) if block_given? yield((Job.where('id in (?)', ids).completed.count.to_f / jobs.size.to_f) * 100.0) end end end true end |
Instance Method Details
#assigned? ⇒ Boolean
Has the job been assigned to an employee?
121 122 123 124 125 126 127 |
# File 'lib/job_boss/job.rb', line 121 def assigned? # If the #assigned? method is being called for but the job hasn't been completed, reload # to check to see if it has been assigned self.reload if !completed? && employee_pid.nil? employee_pid && employee_host end |
#batch ⇒ Object
59 60 61 |
# File 'lib/job_boss/job.rb', line 59 def batch self.batch_id && Batch.new(:batch_id => self.batch_id, :priority => self.priority) end |
#cancel ⇒ Object
Mark the job as cancelled so that the boss won’t run the job and so that the employee running the job gets stopped (if it’s been dispatched)
96 97 98 |
# File 'lib/job_boss/job.rb', line 96 def cancel mark_as_cancelled end |
#cancelled? ⇒ Boolean
Has the job been cancelled?
101 102 103 |
# File 'lib/job_boss/job.rb', line 101 def cancelled? !!cancelled_at end |
#completed? ⇒ Boolean
Is the job complete?
90 91 92 |
# File 'lib/job_boss/job.rb', line 90 def completed? !!completed_at end |
#dispatch(boss) ⇒ Object
Method used by the boss to dispatch an employee
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/job_boss/job.rb', line 22 def dispatch(boss) boss.logger.info "Dispatching Job ##{self.id}: #{self.prototype}" pid = fork do ActiveRecord::Base.connection.reconnect! $0 = "[job_boss employee] job ##{self.id} #{self.prototype})" Process.setpriority(Process::PRIO_PROCESS, 0, 19) begin mark_employee value = self.class.call_path(self.path, *self.args) self.update_attribute(:result, value) rescue Exception => exception mark_exception(exception) boss.logger.error "Error running job ##{self.id}!" ensure until mark_as_completed sleep(1) end boss.logger.info "Job ##{self.id} completed in #{self.time_taken} seconds, exiting..." Kernel.exit end end Process.detach(pid) ActiveRecord::Base.connection.reconnect! end |
#error ⇒ Object
If the job raised an exception, this method will return the instance of that exception with the message and backtrace
162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/job_boss/job.rb', line 162 def error # If the error is being called for but the job hasn't been completed, reload # to check to see if there was a error self.reload if !completed? && .nil? if error_class && && error_backtrace error = Kernel.const_get(error_class).new() error.set_backtrace(error_backtrace) error end end |
#mark_as_mia ⇒ Object
110 111 112 113 114 |
# File 'lib/job_boss/job.rb', line 110 def mark_as_mia self.completed_at = Time.now self.status = 'mia' self.save end |
#mark_as_started ⇒ Object
238 239 240 |
# File 'lib/job_boss/job.rb', line 238 def mark_as_started Job.update_all(['started_at = ?', Time.now], ['started_at IS NULL AND id = ?', self.id]) > 0 end |
#mark_for_redo ⇒ Object
Clear out the job and put it back onto the queue for processing
74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/job_boss/job.rb', line 74 def mark_for_redo self.reload self.started_at = nil self.result = nil self.completed_at = nil self.cancelled_at = nil self.status = nil self.error_class = nil self. = nil self.error_backtrace = nil self.employee_host = nil self.employee_pid = nil self.save end |
#mia? ⇒ Boolean
116 117 118 |
# File 'lib/job_boss/job.rb', line 116 def mia? completed_at && (status == 'mia') end |
#prototype ⇒ Object
17 18 19 |
# File 'lib/job_boss/job.rb', line 17 def prototype self.path + "(#{self.args.collect(&:inspect).join(', ')})" end |
#result ⇒ Object
63 64 65 66 67 68 69 70 71 |
# File 'lib/job_boss/job.rb', line 63 def result # If the result is being called for but the job hasn't been completed, reload # to check to see if there was a result self.reload if !completed? && read_attribute(:result).nil? value = read_attribute(:result) value.is_a?(Array) ? value.first : value end |
#result=(value) ⇒ Object
Store result as first and only value of an array so that the value always gets serialized Was having issues with the boolean value of false getting stored as the string “f”
55 56 57 |
# File 'lib/job_boss/job.rb', line 55 def result=(value) write_attribute(:result, [value]) end |
#running? ⇒ Boolean
Is the job running?
130 131 132 133 134 135 136 |
# File 'lib/job_boss/job.rb', line 130 def running? # If the #running? method is being called for but the job hasn't started, reload # to check to see if it has been assigned self.reload if started_at.nil? || completed_at.nil? started_at && !completed_at end |
#succeeded? ⇒ Boolean
Did the job succeed?
106 107 108 |
# File 'lib/job_boss/job.rb', line 106 def succeeded? completed_at && (status == 'success') end |
#time_taken ⇒ Object
How long did the job take?
139 140 141 142 143 144 145 |
# File 'lib/job_boss/job.rb', line 139 def time_taken # If the #time_taken method is being called for but the job doesn't seem to have started/completed # reload to see if it has self.reload if started_at.nil? || completed_at.nil? completed_at - started_at if completed_at && started_at end |