Class: Delayed::Backend::ActiveRecord::Job

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
Base
Defined in:
lib/delayed/backend/active_record.rb

Overview

A job object that is persisted to the database. Contains the work object as a YAML field.

Constant Summary

Constants included from Base

Base::ParseObjectFromYaml

Class Method Summary collapse

Methods included from Base

#failed?, #hook, included, #invoke_job, #max_attempts, #name, #payload_object, #payload_object=, #reschedule_at, #unlock

Class Method Details

.after_forkObject



27
28
29
# File 'lib/delayed/backend/active_record.rb', line 27

def self.after_fork
  ::ActiveRecord::Base.establish_connection
end

.before_forkObject



23
24
25
# File 'lib/delayed/backend/active_record.rb', line 23

def self.before_fork
  ::ActiveRecord::Base.clear_all_connections!
end

.clear_locks!(worker_name) ⇒ Object

When a worker is exiting, make sure we don’t have any locked jobs.



32
33
34
# File 'lib/delayed/backend/active_record.rb', line 32

def self.clear_locks!(worker_name)
  update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

.db_time_nowObject

Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.



61
62
63
64
65
66
67
68
69
# File 'lib/delayed/backend/active_record.rb', line 61

def self.db_time_now
  if Time.zone
    Time.zone.now
  elsif ::ActiveRecord::Base.default_timezone == :utc
    Time.now.utc
  else
    Time.now
  end
end

.jobs_available_to_worker(worker_name, max_run_time) ⇒ Object



36
37
38
39
40
41
# File 'lib/delayed/backend/active_record.rb', line 36

def self.jobs_available_to_worker(worker_name, max_run_time)
  scope = self.ready_to_run(worker_name, max_run_time)
  scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
  scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
  scope.by_priority
end

.reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object

Reserve a single job in a single update query. This causes workers to serialize on the database and avoids contention.



45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/delayed/backend/active_record.rb', line 45

def self.reserve(worker, max_run_time = Worker.max_run_time)
  affected_rows = 0
  ::ActiveRecord::Base.silence do
    affected_rows = jobs_available_to_worker(worker.name, max_run_time).limit(1).update_all(["locked_at = ?, locked_by = ?", db_time_now, worker.name])
  end

  if affected_rows == 1
    locked_by_worker(worker.name, max_run_time).first
  else
    nil
  end
end