Class: Delayed::Backend::ActiveRecord::Job

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
Base
Defined in:
lib/delayed/backend/active_record.rb

Overview

A job object that is persisted to the database. Contains the work object as a YAML field.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.after_forkObject



72
73
74
# File 'lib/delayed/backend/active_record.rb', line 72

def self.after_fork
  ::ActiveRecord::Base.establish_connection
end

.before_forkObject



64
65
66
67
68
69
70
# File 'lib/delayed/backend/active_record.rb', line 64

def self.before_fork
  if Gem::Version.new("7.1.0") <= Gem::Version.new(::ActiveRecord::VERSION::STRING)
    ::ActiveRecord::Base.connection_handler.clear_all_connections!(:all)
  else
    ::ActiveRecord::Base.connection_handler.clear_all_connections!
  end
end

.clear_locks!(worker_name) ⇒ Object

When a worker is exiting, make sure we don’t have any locked jobs.



77
78
79
# File 'lib/delayed/backend/active_record.rb', line 77

def self.clear_locks!(worker_name)
  where(locked_by: worker_name).update_all(locked_by: nil, locked_at: nil)
end

.db_time_nowObject

Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have synchronized clocks.



188
189
190
191
192
193
194
195
196
# File 'lib/delayed/backend/active_record.rb', line 188

def self.db_time_now
  if Time.zone
    Time.zone.now
  elsif default_timezone == :utc
    Time.now.utc
  else
    Time.now # rubocop:disable Rails/TimeZone
  end
end

.default_timezoneObject



198
199
200
201
202
203
204
# File 'lib/delayed/backend/active_record.rb', line 198

def self.default_timezone
  if ::ActiveRecord.respond_to?(:default_timezone)
    ::ActiveRecord.default_timezone
  else
    ::ActiveRecord::Base.default_timezone
  end
end

.ready_to_run(worker_name, max_run_time) ⇒ Object



55
56
57
58
59
60
61
62
# File 'lib/delayed/backend/active_record.rb', line 55

def self.ready_to_run(worker_name, max_run_time)
  where(
    "((run_at <= ? AND (locked_at IS NULL OR locked_at < ?)) OR locked_by = ?) AND failed_at IS NULL",
    db_time_now,
    db_time_now - max_run_time,
    worker_name
  )
end

.reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object



81
82
83
84
85
86
87
88
89
90
# File 'lib/delayed/backend/active_record.rb', line 81

def self.reserve(worker, max_run_time = Worker.max_run_time)
  ready_scope =
    ready_to_run(worker.name, max_run_time)
    .min_priority
    .max_priority
    .for_queues
    .by_priority

  reserve_with_scope(ready_scope, worker, db_time_now)
end

.reserve_with_scope(ready_scope, worker, now) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
# File 'lib/delayed/backend/active_record.rb', line 92

def self.reserve_with_scope(ready_scope, worker, now)
  case Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategy
  # Optimizations for faster lookups on some common databases
  when :optimized_sql
    reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
  # Slower but in some cases more unproblematic strategy to lookup records
  # See https://github.com/collectiveidea/delayed_job_active_record/pull/89 for more details.
  when :default_sql
    reserve_with_scope_using_default_sql(ready_scope, worker, now)
  end
end

.reserve_with_scope_using_default_sql(ready_scope, worker, now) ⇒ Object



118
119
120
121
122
123
124
125
126
127
# File 'lib/delayed/backend/active_record.rb', line 118

def self.reserve_with_scope_using_default_sql(ready_scope, worker, now)
  # This is our old fashion, tried and true, but possibly slower lookup
  # Instead of reading the entire job record for our detect loop, we select only the id,
  # and only read the full job record after we've successfully locked the job.
  # This can have a noticeable impact on large read_ahead configurations and large payload jobs.
  ready_scope.limit(worker.read_ahead).select(:id).detect do |job|
    count = ready_scope.where(id: job.id).update_all(locked_at: now, locked_by: worker.name)
    count == 1 && job.reload
  end
end

.reserve_with_scope_using_optimized_mssql(ready_scope, worker, now) ⇒ Object



170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/delayed/backend/active_record.rb', line 170

def self.reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)
  # The MSSQL driver doesn't generate a limit clause when update_all
  # is called directly
  subsubquery_sql = ready_scope.limit(1).to_sql
  # select("id") doesn't generate a subquery, so force a subquery
  subquery_sql = "SELECT id FROM (#{subsubquery_sql}) AS x"
  quoted_table_name = connection.quote_table_name(table_name)
  sql = "UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql})"
  count = connection.execute(sanitize_sql([sql, now, worker.name]))
  return nil if count == 0

  # MSSQL JDBC doesn't support OUTPUT INSERTED.* for returning a result set, so query locked row
  where(locked_at: now, locked_by: worker.name, failed_at: nil).first
end

.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/delayed/backend/active_record.rb', line 155

def self.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
  # Removing the millisecond precision from now(time object)
  # MySQL 5.6.4 onwards millisecond precision exists, but the
  # datetime object created doesn't have precision, so discarded
  # while updating. But during the where clause, for mysql(>=5.6.4),
  # it queries with precision as well. So removing the precision
  now = now.change(usec: 0)
  # This works on MySQL and possibly some other DBs that support
  # UPDATE...LIMIT. It uses separate queries to lock and return the job
  count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name)
  return nil if count == 0

  where(locked_at: now, locked_by: worker.name, failed_at: nil).first
end

.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/delayed/backend/active_record.rb', line 129

def self.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now)
  # Custom SQL required for PostgreSQL because postgres does not support UPDATE...LIMIT
  # This locks the single record 'FOR UPDATE' in the subquery
  # http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE
  # Note: active_record would attempt to generate UPDATE...LIMIT like
  # SQL for Postgres if we use a .limit() filter, but it would not
  # use 'FOR UPDATE' and we would have many locking conflicts
  subquery = ready_scope.limit(1).lock(true).select("id").to_sql

  # On PostgreSQL >= 9.5 we leverage SKIP LOCK to avoid multiple workers blocking each other
  # when attempting to get the next available job
  # https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE
  if connection.send(:postgresql_version) >= 9_05_00 # rubocop:disable Style/NumericLiterals
    subquery += " SKIP LOCKED"
  end

  quoted_name = connection.quote_table_name(table_name)
  find_by_sql(
    [
      "UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery}) RETURNING *",
      now,
      worker.name
    ]
  ).first
end

.reserve_with_scope_using_optimized_sql(ready_scope, worker, now) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/delayed/backend/active_record.rb', line 104

def self.reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
  case connection.adapter_name
  when "PostgreSQL", "PostGIS"
    reserve_with_scope_using_optimized_postgres(ready_scope, worker, now)
  when "MySQL", "Mysql2", "Trilogy"
    reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
  when "MSSQL", "Teradata"
    reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)
  # Fallback for unknown / other DBMS
  else
    reserve_with_scope_using_default_sql(ready_scope, worker, now)
  end
end

.set_delayed_job_table_nameObject



48
49
50
51
# File 'lib/delayed/backend/active_record.rb', line 48

def self.set_delayed_job_table_name
  delayed_job_table_name = "#{::ActiveRecord::Base.table_name_prefix}delayed_jobs"
  self.table_name = delayed_job_table_name
end

Instance Method Details

#reload(*args) ⇒ Object



206
207
208
209
# File 'lib/delayed/backend/active_record.rb', line 206

def reload(*args)
  reset
  super
end