Class: Delayed::Backend::ActiveRecord::Job

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
Base
Defined in:
lib/delayed/backend/active_record.rb

Overview

A job object that is persisted to the database. Contains the work object as a YAML field.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.after_forkObject



35
36
37
# File 'lib/delayed/backend/active_record.rb', line 35

def self.after_fork
  ::ActiveRecord::Base.establish_connection
end

.before_forkObject



31
32
33
# File 'lib/delayed/backend/active_record.rb', line 31

def self.before_fork
  ::ActiveRecord::Base.clear_all_connections!
end

.clear_locks!(worker_name) ⇒ Object

When a worker is exiting, make sure we don’t have any locked jobs.



40
41
42
# File 'lib/delayed/backend/active_record.rb', line 40

def self.clear_locks!(worker_name)
  update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

.db_time_nowObject

Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.



82
83
84
85
86
87
88
89
90
# File 'lib/delayed/backend/active_record.rb', line 82

def self.db_time_now
  if Time.zone
    Time.zone.now
  elsif ::ActiveRecord::Base.default_timezone == :utc
    Time.now.utc
  else
    Time.now
  end
end

.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object

Find a few candidate jobs to run (in case some immediately get locked by others).



45
46
47
48
49
50
51
52
53
54
# File 'lib/delayed/backend/active_record.rb', line 45

def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
  scope = self.ready_to_run(worker_name, max_run_time)
  scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
  scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
  scope = scope.scoped(:conditions => ["queue IN (?)", Worker.queues]) if Worker.queues.any?

  ::ActiveRecord::Base.silence do
    scope.by_priority.all(:limit => limit)
  end
end

.rails3?Boolean

Returns:

  • (Boolean)


15
16
17
# File 'lib/delayed/backend/active_record.rb', line 15

def self.rails3?
  ::ActiveRecord::VERSION::MAJOR == 3
end

Instance Method Details

#lock_exclusively!(max_run_time, worker) ⇒ Object

Lock this job for this worker. Returns true if we have the lock, false otherwise.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/delayed/backend/active_record.rb', line 58

def lock_exclusively!(max_run_time, worker)
  now = self.class.db_time_now
  affected_rows = if locked_by != worker
    # We don't own this job so we will update the locked_by name and the locked_at
    self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now])
  else
    # We already own this job, this may happen if the job queue crashes.
    # Simply resume and update the locked_at
    self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
  end
  if affected_rows == 1
    self.locked_at = now
    self.locked_by = worker
    self.locked_at_will_change!
    self.locked_by_will_change!
    return true
  else
    return false
  end
end

#reload(*args) ⇒ Object



92
93
94
95
# File 'lib/delayed/backend/active_record.rb', line 92

def reload(*args)
  reset
  super
end