Class: Delayed::Backend::ActiveRecord::Job

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
Base
Defined in:
lib/delayed/backend/active_record.rb

Overview

A job object that is persisted to the database. Contains the work object as a YAML field.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.after_forkObject



34
35
36
# File 'lib/delayed/backend/active_record.rb', line 34

def self.after_fork
  ::ActiveRecord::Base.establish_connection
end

.before_forkObject



30
31
32
# File 'lib/delayed/backend/active_record.rb', line 30

def self.before_fork
  ::ActiveRecord::Base.clear_all_connections!
end

.by_priorityObject



26
27
28
# File 'lib/delayed/backend/active_record.rb', line 26

def self.by_priority
  order('priority ASC, run_at ASC')
end

.clear_locks!(worker_name) ⇒ Object

When a worker is exiting, make sure we don’t have any locked jobs.



39
40
41
# File 'lib/delayed/backend/active_record.rb', line 39

def self.clear_locks!(worker_name)
  update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

.db_time_nowObject

Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.



98
99
100
101
102
103
104
105
106
# File 'lib/delayed/backend/active_record.rb', line 98

def self.db_time_now
  if Time.zone
    Time.zone.now
  elsif ::ActiveRecord::Base.default_timezone == :utc
    Time.now.utc
  else
    Time.now
  end
end

.ready_to_run(worker_name, max_run_time) ⇒ Object



22
23
24
# File 'lib/delayed/backend/active_record.rb', line 22

def self.ready_to_run(worker_name, max_run_time)
  where('(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL', db_time_now, db_time_now - max_run_time, worker_name)
end

.reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/delayed/backend/active_record.rb', line 43

def self.reserve(worker, max_run_time = Worker.max_run_time)
  # scope to filter to records that are "ready to run"
  readyScope = self.ready_to_run(worker.name, max_run_time)

  # scope to filter to the single next eligible job
  nextScope = readyScope.scoped
  nextScope = nextScope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
  nextScope = nextScope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
  nextScope = nextScope.scoped(:conditions => ["queue IN (?)", Worker.queues]) if Worker.queues.any?
  nextScope = nextScope.scoped.by_priority.limit(1)

  now = self.db_time_now

  if ::ActiveRecord::Base.connection.adapter_name == "PostgreSQL"
    # Custom SQL required for PostgreSQL because postgres does not support UPDATE...LIMIT
    # This locks the single record 'FOR UPDATE' in the subquery (http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE)
    # Note: active_record would attempt to generate UPDATE...LIMIT like sql for postgres if we use a .limit() filter, but it would not use
    # 'FOR UPDATE' and we would have many locking conflicts
    quotedTableName = ::ActiveRecord::Base.connection.quote_column_name(self.table_name)
    subquerySql = nextScope.lock(true).select('id').to_sql
    reserved = self.find_by_sql(["UPDATE #{quotedTableName} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquerySql}) RETURNING *",now,worker.name])
    return reserved[0]
  else
    # This works on MySQL and other DBs that support UPDATE...LIMIT. It uses separate queries to lock and return the job
    count = nextScope.update_all(:locked_at => now, :locked_by => worker.name)
    return nil if count == 0
    return self.where(:locked_at => now, :locked_by => worker.name).first
  end
end

.set_delayed_job_table_nameObject



15
16
17
18
# File 'lib/delayed/backend/active_record.rb', line 15

def self.set_delayed_job_table_name
  delayed_job_table_name = "#{::ActiveRecord::Base.table_name_prefix}delayed_jobs"
  self.table_name = delayed_job_table_name
end

Instance Method Details

#lock_exclusively!(max_run_time, worker) ⇒ Object

Lock this job for this worker. Returns true if we have the lock, false otherwise.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/delayed/backend/active_record.rb', line 75

def lock_exclusively!(max_run_time, worker)
  now = self.class.db_time_now
  affected_rows = if locked_by != worker
    # We don't own this job so we will update the locked_by name and the locked_at
    self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now])
  else
    # We already own this job, this may happen if the job queue crashes.
    # Simply resume and update the locked_at
    self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
  end
  if affected_rows == 1
    self.locked_at = now
    self.locked_by = worker
    self.changed_attributes.clear
    return true
  else
    return false
  end
end

#reload(*args) ⇒ Object



108
109
110
111
# File 'lib/delayed/backend/active_record.rb', line 108

def reload(*args)
  reset
  super
end