Class: JobBoss::Job

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/job_boss/job.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.call_path(path, *args) ⇒ Object

Raises:

  • (ArgumentError)


265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/job_boss/job.rb', line 265

def call_path(path, *args)
  require 'active_support'
    
  raise ArgumentError, "Invalid path (must have #)" unless path.match(/.#./)
  controller, action = path.split('#')
    
  controller_objects = []
  begin
    controller_objects << Kernel.const_get("#{controller.classify}Jobs").new
  rescue NameError
  end

  begin
    controller_objects << Kernel.const_get("#{controller.classify}")
  rescue NameError
  end

  raise ArgumentError, "Invalid controller: #{controller}" if controller_objects.empty?

  controller_object = controller_objects.detect {|controller_object| controller_object.respond_to?(action) }

  raise ArgumentError, "Invalid path action: #{action}" if !controller_object

  controller_object.send(action, *args)
end

.cancel(jobs = nil) ⇒ Object



219
220
221
222
223
# File 'lib/job_boss/job.rb', line 219

def cancel(jobs = nil)
  jobs = get_jobs(jobs)

  self.all.each(&:cancel)
end

.cancelled?Boolean

Returns:

  • (Boolean)


215
216
217
# File 'lib/job_boss/job.rb', line 215

def cancelled?
  !!self.where('cancelled_at IS NOT NULL').first
end

.completed_percentObject

Returns the completion percentage of a set of jobs



156
157
158
# File 'lib/job_boss/job.rb', line 156

def self.completed_percent
  self.completed.count.to_f / self.count.to_f
end

.delete_jobs_before(time) ⇒ Object

Given a time object Delete all jobs which were completed earlier than that time



233
234
235
# File 'lib/job_boss/job.rb', line 233

def delete_jobs_before(time)
  completed.where('completed_at < ?', time).delete_all
end

.get_jobs(jobs) ⇒ Object



225
226
227
228
229
# File 'lib/job_boss/job.rb', line 225

def get_jobs(jobs)
  jobs = [jobs] if jobs.is_a?(Job)
  jobs = self.scoped if jobs.nil?
  jobs
end

.result_hash(jobs = nil) ⇒ Object

Given a job or an array of jobs Returns a hash where the keys are the job method arguments and the values are the results of the job processing



202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/job_boss/job.rb', line 202

def result_hash(jobs = nil)
  jobs = get_jobs(jobs)

  # the #result method automatically reloads the result here if needed but this will
  # do it in one SQL call
  jobs = Job.find(jobs.collect(&:id))

  require 'yaml'
  jobs.inject({}) do |hash, job|
    hash.merge(job.args => job.result)
  end
end

.time_takenObject

How long did have set of jobs taken? Returns nil if not all jobs are complete



149
150
151
152
153
# File 'lib/job_boss/job.rb', line 149

def self.time_taken
  return nil if self.completed.count != self.count

  self.maximum(:completed_at) - self.minimum(:started_at)
end

.wait_for_jobs(jobs = nil, sleep_interval = 0.5) ⇒ Object

Given a job or an array of jobs Will cause the process to sleep until all specified jobs have completed sleep_interval specifies polling period



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/job_boss/job.rb', line 178

def wait_for_jobs(jobs = nil, sleep_interval = 0.5)
  at_exit do
    Job.not_completed.find(jobs).each(&:cancel)
  end

  jobs = get_jobs(jobs)

  ids = jobs.collect(&:id)
  Job.uncached do
    until Job.unfinished.find_all_by_id(ids).count == 0
      sleep(sleep_interval)

      if block_given?
        yield((Job.where('id in (?)', ids).completed.count.to_f / jobs.size.to_f) * 100.0)
      end
    end
  end

  true
end

Instance Method Details

#assigned?Boolean

Has the job been assigned to an employee?

Returns:

  • (Boolean)


121
122
123
124
125
126
127
# File 'lib/job_boss/job.rb', line 121

def assigned?
  # If the #assigned? method is being called for but the job hasn't been completed, reload
  # to check to see if it has been assigned
  self.reload if !completed? && employee_pid.nil?

  employee_pid && employee_host
end

#batchObject



59
60
61
# File 'lib/job_boss/job.rb', line 59

def batch
  self.batch_id && Batch.new(:batch_id => self.batch_id, :priority => self.priority)
end

#cancelObject

Mark the job as cancelled so that the boss won’t run the job and so that the employee running the job gets stopped (if it’s been dispatched)



96
97
98
# File 'lib/job_boss/job.rb', line 96

def cancel
  mark_as_cancelled
end

#cancelled?Boolean

Has the job been cancelled?

Returns:

  • (Boolean)


101
102
103
# File 'lib/job_boss/job.rb', line 101

def cancelled?
  !!cancelled_at
end

#completed?Boolean

Is the job complete?

Returns:

  • (Boolean)


90
91
92
# File 'lib/job_boss/job.rb', line 90

def completed?
  !!completed_at
end

#dispatch(boss) ⇒ Object

Method used by the boss to dispatch an employee



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/job_boss/job.rb', line 22

def dispatch(boss)
  boss.logger.info "Dispatching Job ##{self.id}: #{self.prototype}"

  pid = fork do
    ActiveRecord::Base.connection.reconnect!
    $0 = "[job_boss employee] job ##{self.id} #{self.prototype})"
    Process.setpriority(Process::PRIO_PROCESS, 0, 19)

    begin
      mark_employee

      value = self.class.call_path(self.path, *self.args)

      self.update_attribute(:result, value)
    rescue Exception => exception
      mark_exception(exception)
      boss.logger.error "Error running job ##{self.id}!"
    ensure
      until mark_as_completed
        sleep(1)
      end

      boss.logger.info "Job ##{self.id} completed in #{self.time_taken} seconds, exiting..."
      Kernel.exit
    end
  end

  Process.detach(pid)
  ActiveRecord::Base.connection.reconnect!
end

#errorObject

If the job raised an exception, this method will return the instance of that exception with the message and backtrace



162
163
164
165
166
167
168
169
170
171
172
# File 'lib/job_boss/job.rb', line 162

def error
  # If the error is being called for but the job hasn't been completed, reload
  # to check to see if there was a error
  self.reload if !completed? && error_message.nil?

  if error_class && error_message && error_backtrace
    error = Kernel.const_get(error_class).new(error_message)
    error.set_backtrace(error_backtrace)
    error
  end
end

#mark_as_miaObject



110
111
112
113
114
# File 'lib/job_boss/job.rb', line 110

def mark_as_mia
  self.completed_at = Time.now
  self.status = 'mia'
  self.save
end

#mark_as_startedObject



238
239
240
# File 'lib/job_boss/job.rb', line 238

def mark_as_started
  Job.update_all(['started_at = ?', Time.now], ['started_at IS NULL AND id = ?', self.id]) > 0
end

#mark_for_redoObject

Clear out the job and put it back onto the queue for processing



74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/job_boss/job.rb', line 74

def mark_for_redo
  self.reload
  self.started_at = nil
  self.result = nil
  self.completed_at = nil
  self.cancelled_at = nil
  self.status = nil
  self.error_class = nil
  self.error_message = nil
  self.error_backtrace = nil
  self.employee_host = nil
  self.employee_pid = nil
  self.save
end

#mia?Boolean

Returns:

  • (Boolean)


116
117
118
# File 'lib/job_boss/job.rb', line 116

def mia?
  completed_at && (status == 'mia')
end

#prototypeObject



17
18
19
# File 'lib/job_boss/job.rb', line 17

def prototype
  self.path + "(#{self.args.collect(&:inspect).join(', ')})"
end

#resultObject



63
64
65
66
67
68
69
70
71
# File 'lib/job_boss/job.rb', line 63

def result
  # If the result is being called for but the job hasn't been completed, reload
  # to check to see if there was a result
  self.reload if !completed? && read_attribute(:result).nil?

  value = read_attribute(:result)

  value.is_a?(Array) ? value.first : value
end

#result=(value) ⇒ Object

Store result as first and only value of an array so that the value always gets serialized Was having issues with the boolean value of false getting stored as the string “f”



55
56
57
# File 'lib/job_boss/job.rb', line 55

def result=(value)
  write_attribute(:result, [value])
end

#running?Boolean

Is the job running?

Returns:

  • (Boolean)


130
131
132
133
134
135
136
# File 'lib/job_boss/job.rb', line 130

def running?
  # If the #running? method is being called for but the job hasn't started, reload
  # to check to see if it has been assigned
  self.reload if started_at.nil? || completed_at.nil?

  started_at && !completed_at
end

#succeeded?Boolean

Did the job succeed?

Returns:

  • (Boolean)


106
107
108
# File 'lib/job_boss/job.rb', line 106

def succeeded?
  completed_at && (status == 'success')
end

#time_takenObject

How long did the job take?



139
140
141
142
143
144
145
# File 'lib/job_boss/job.rb', line 139

def time_taken
  # If the #time_taken method is being called for but the job doesn't seem to have started/completed
  # reload to see if it has
  self.reload if started_at.nil? || completed_at.nil?

  completed_at - started_at if completed_at && started_at
end