Class: Delayed::Backend::ActiveRecord::Job

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
Base
Defined in:
lib/delayed/backend/active_record.rb

Overview

A job object that is persisted to the database. Contains the work object as a YAML field.

Constant Summary

Constants included from Base

Base::ParseObjectFromYaml

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Base

#failed?, #hook, included, #invoke_job, #max_attempts, #name, #payload_object, #payload_object=, #reschedule_at, #unlock

Class Method Details

.after_forkObject



23
24
25
# File 'lib/delayed/backend/active_record.rb', line 23

def self.after_fork
  ::ActiveRecord::Base.establish_connection
end

.before_forkObject



19
20
21
# File 'lib/delayed/backend/active_record.rb', line 19

def self.before_fork
  ::ActiveRecord::Base.clear_all_connections!
end

.clear_locks!(worker_name) ⇒ Object

When a worker is exiting, make sure we don’t have any locked jobs.



28
29
30
# File 'lib/delayed/backend/active_record.rb', line 28

def self.clear_locks!(worker_name)
  update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

.db_time_nowObject

Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.



69
70
71
72
73
74
75
76
77
# File 'lib/delayed/backend/active_record.rb', line 69

def self.db_time_now
  if Time.zone
    Time.zone.now
  elsif ::ActiveRecord::Base.default_timezone == :utc
    Time.now.utc
  else
    Time.now
  end
end

.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object

Find a few candidate jobs to run (in case some immediately get locked by others).



33
34
35
36
37
38
39
40
41
# File 'lib/delayed/backend/active_record.rb', line 33

def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
  scope = self.ready_to_run(worker_name, max_run_time)
  scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
  scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority

  ::ActiveRecord::Base.silence do
    scope.by_priority.all(:limit => limit)
  end
end

Instance Method Details

#lock_exclusively!(max_run_time, worker) ⇒ Object

Lock this job for this worker. Returns true if we have the lock, false otherwise.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/delayed/backend/active_record.rb', line 45

def lock_exclusively!(max_run_time, worker)
  now = self.class.db_time_now
  affected_rows = if locked_by != worker
    # We don't own this job so we will update the locked_by name and the locked_at
    self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now])
  else
    # We already own this job, this may happen if the job queue crashes.
    # Simply resume and update the locked_at
    self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
  end
  if affected_rows == 1
    self.locked_at = now
    self.locked_by = worker
    self.locked_at_will_change!
    self.locked_by_will_change!
    return true
  else
    return false
  end
end