Class: Delayed::Backend::ActiveRecord::Job
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Delayed::Backend::ActiveRecord::Job
- Includes:
- Base
- Defined in:
- lib/delayed/backend/active_record.rb
Overview
A job object that is persisted to the database. Contains the work object as a YAML field.
Class Method Summary collapse
- .after_fork ⇒ Object
- .before_fork ⇒ Object
- .by_priority ⇒ Object
-
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
-
.db_time_now ⇒ Object
Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
- .ready_to_run(worker_name, max_run_time) ⇒ Object
- .reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object
- .set_delayed_job_table_name ⇒ Object
Instance Method Summary collapse
-
#lock_exclusively!(max_run_time, worker) ⇒ Object
Lock this job for this worker.
- #reload(*args) ⇒ Object
Class Method Details
.after_fork ⇒ Object
34 35 36 |
# File 'lib/delayed/backend/active_record.rb', line 34 def self.after_fork ::ActiveRecord::Base.establish_connection end |
.before_fork ⇒ Object
30 31 32 |
# File 'lib/delayed/backend/active_record.rb', line 30 def self.before_fork ::ActiveRecord::Base.clear_all_connections! end |
.by_priority ⇒ Object
26 27 28 |
# File 'lib/delayed/backend/active_record.rb', line 26 def self.by_priority order('priority ASC, run_at ASC') end |
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
39 40 41 |
# File 'lib/delayed/backend/active_record.rb', line 39 def self.clear_locks!(worker_name) update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name]) end |
.db_time_now ⇒ Object
Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
98 99 100 101 102 103 104 105 106 |
# File 'lib/delayed/backend/active_record.rb', line 98 def self.db_time_now if Time.zone Time.zone.now elsif ::ActiveRecord::Base.default_timezone == :utc Time.now.utc else Time.now end end |
.ready_to_run(worker_name, max_run_time) ⇒ Object
22 23 24 |
# File 'lib/delayed/backend/active_record.rb', line 22 def self.ready_to_run(worker_name, max_run_time) where('(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL', db_time_now, db_time_now - max_run_time, worker_name) end |
.reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/delayed/backend/active_record.rb', line 43 def self.reserve(worker, max_run_time = Worker.max_run_time) # scope to filter to records that are "ready to run" readyScope = self.ready_to_run(worker.name, max_run_time) # scope to filter to the single next eligible job nextScope = readyScope.scoped nextScope = nextScope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority nextScope = nextScope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority nextScope = nextScope.scoped(:conditions => ["queue IN (?)", Worker.queues]) if Worker.queues.any? nextScope = nextScope.scoped.by_priority.limit(1) now = self.db_time_now if ::ActiveRecord::Base.connection.adapter_name == "PostgreSQL" # Custom SQL required for PostgreSQL because postgres does not support UPDATE...LIMIT # This locks the single record 'FOR UPDATE' in the subquery (http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE) # Note: active_record would attempt to generate UPDATE...LIMIT like sql for postgres if we use a .limit() filter, but it would not use # 'FOR UPDATE' and we would have many locking conflicts quotedTableName = ::ActiveRecord::Base.connection.quote_column_name(self.table_name) subquerySql = nextScope.lock(true).select('id').to_sql reserved = self.find_by_sql(["UPDATE #{quotedTableName} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquerySql}) RETURNING *",now,worker.name]) return reserved[0] else # This works on MySQL and other DBs that support UPDATE...LIMIT. It uses separate queries to lock and return the job count = nextScope.update_all(:locked_at => now, :locked_by => worker.name) return nil if count == 0 return self.where(:locked_at => now, :locked_by => worker.name).first end end |
.set_delayed_job_table_name ⇒ Object
15 16 17 18 |
# File 'lib/delayed/backend/active_record.rb', line 15 def self.set_delayed_job_table_name delayed_job_table_name = "#{::ActiveRecord::Base.table_name_prefix}delayed_jobs" self.table_name = delayed_job_table_name end |
Instance Method Details
#lock_exclusively!(max_run_time, worker) ⇒ Object
Lock this job for this worker. Returns true if we have the lock, false otherwise.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/delayed/backend/active_record.rb', line 75 def lock_exclusively!(max_run_time, worker) now = self.class.db_time_now affected_rows = if locked_by != worker # We don't own this job so we will update the locked_by name and the locked_at self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now]) else # We already own this job, this may happen if the job queue crashes. # Simply resume and update the locked_at self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker]) end if affected_rows == 1 self.locked_at = now self.locked_by = worker self.changed_attributes.clear return true else return false end end |
#reload(*args) ⇒ Object
108 109 110 111 |
# File 'lib/delayed/backend/active_record.rb', line 108 def reload(*args) reset super end |