Class: Delayed::Backend::ActiveRecord::Job
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Delayed::Backend::ActiveRecord::Job
- Includes:
- Base
- Defined in:
- lib/delayed/backend/active_record.rb
Overview
A job object that is persisted to the database. Contains the work object as a YAML field.
Class Method Summary collapse
- .after_fork ⇒ Object
- .before_fork ⇒ Object
- .by_priority ⇒ Object
-
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
-
.db_time_now ⇒ Object
Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
-
.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object
Find a few candidate jobs to run (in case some immediately get locked by others).
- .rails3? ⇒ Boolean
- .ready_to_run(worker_name, max_run_time) ⇒ Object
Instance Method Summary collapse
-
#lock_exclusively!(max_run_time, worker) ⇒ Object
Lock this job for this worker.
- #reload(*args) ⇒ Object
Class Method Details
.after_fork ⇒ Object
40 41 42 |
# File 'lib/delayed/backend/active_record.rb', line 40 def self.after_fork ::ActiveRecord::Base.establish_connection end |
.before_fork ⇒ Object
36 37 38 |
# File 'lib/delayed/backend/active_record.rb', line 36 def self.before_fork ::ActiveRecord::Base.clear_all_connections! end |
.by_priority ⇒ Object
25 26 27 |
# File 'lib/delayed/backend/active_record.rb', line 25 def self.by_priority order('priority ASC, run_at ASC') end |
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
45 46 47 |
# File 'lib/delayed/backend/active_record.rb', line 45 def self.clear_locks!(worker_name) update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name]) end |
.db_time_now ⇒ Object
Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
87 88 89 90 91 92 93 94 95 |
# File 'lib/delayed/backend/active_record.rb', line 87 def self.db_time_now if Time.zone Time.zone.now elsif ::ActiveRecord::Base.default_timezone == :utc Time.now.utc else Time.now end end |
.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object
Find a few candidate jobs to run (in case some immediately get locked by others).
50 51 52 53 54 55 56 57 58 59 |
# File 'lib/delayed/backend/active_record.rb', line 50 def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) scope = self.ready_to_run(worker_name, max_run_time) scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority scope = scope.scoped(:conditions => ["queue IN (?)", Worker.queues]) if Worker.queues.any? ::ActiveRecord::Base.silence do scope.by_priority.all(:limit => limit) end end |
.rails3? ⇒ Boolean
16 17 18 |
# File 'lib/delayed/backend/active_record.rb', line 16 def self.rails3? ::ActiveRecord::VERSION::MAJOR == 3 end |
.ready_to_run(worker_name, max_run_time) ⇒ Object
22 23 24 |
# File 'lib/delayed/backend/active_record.rb', line 22 def self.ready_to_run(worker_name, max_run_time) where('(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL', db_time_now, db_time_now - max_run_time, worker_name) end |
Instance Method Details
#lock_exclusively!(max_run_time, worker) ⇒ Object
Lock this job for this worker. Returns true if we have the lock, false otherwise.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/delayed/backend/active_record.rb', line 63 def lock_exclusively!(max_run_time, worker) now = self.class.db_time_now affected_rows = if locked_by != worker # We don't own this job so we will update the locked_by name and the locked_at self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now]) else # We already own this job, this may happen if the job queue crashes. # Simply resume and update the locked_at self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker]) end if affected_rows == 1 self.locked_at = now self.locked_by = worker self.locked_at_will_change! self.locked_by_will_change! return true else return false end end |
#reload(*args) ⇒ Object
97 98 99 100 |
# File 'lib/delayed/backend/active_record.rb', line 97 def reload(*args) reset super end |