Class: Delayed::Backend::ActiveRecord::Job
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Delayed::Backend::ActiveRecord::Job
- Includes:
- Base
- Defined in:
- lib/delayed/backend/active_record.rb
Overview
A job object that is persisted to the database. Contains the work object as a YAML field.
Class Method Summary collapse
- .after_fork ⇒ Object
- .before_fork ⇒ Object
-
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
-
.db_time_now ⇒ Object
Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
-
.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object
Find a few candidate jobs to run (in case some immediately get locked by others).
- .rails3? ⇒ Boolean
Instance Method Summary collapse
-
#lock_exclusively!(max_run_time, worker) ⇒ Object
Lock this job for this worker.
- #reload(*args) ⇒ Object
Class Method Details
.after_fork ⇒ Object
35 36 37 |
# File 'lib/delayed/backend/active_record.rb', line 35 def self.after_fork ::ActiveRecord::Base.establish_connection end |
.before_fork ⇒ Object
31 32 33 |
# File 'lib/delayed/backend/active_record.rb', line 31 def self.before_fork ::ActiveRecord::Base.clear_all_connections! end |
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
40 41 42 |
# File 'lib/delayed/backend/active_record.rb', line 40 def self.clear_locks!(worker_name) update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name]) end |
.db_time_now ⇒ Object
Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
82 83 84 85 86 87 88 89 90 |
# File 'lib/delayed/backend/active_record.rb', line 82 def self.db_time_now if Time.zone Time.zone.now elsif ::ActiveRecord::Base.default_timezone == :utc Time.now.utc else Time.now end end |
.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object
Find a few candidate jobs to run (in case some immediately get locked by others).
45 46 47 48 49 50 51 52 53 54 |
# File 'lib/delayed/backend/active_record.rb', line 45 def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) scope = self.ready_to_run(worker_name, max_run_time) scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority scope = scope.scoped(:conditions => ["queue IN (?)", Worker.queues]) if Worker.queues.any? ::ActiveRecord::Base.silence do scope.by_priority.all(:limit => limit) end end |
.rails3? ⇒ Boolean
15 16 17 |
# File 'lib/delayed/backend/active_record.rb', line 15 def self.rails3? ::ActiveRecord::VERSION::MAJOR == 3 end |
Instance Method Details
#lock_exclusively!(max_run_time, worker) ⇒ Object
Lock this job for this worker. Returns true if we have the lock, false otherwise.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/delayed/backend/active_record.rb', line 58 def lock_exclusively!(max_run_time, worker) now = self.class.db_time_now affected_rows = if locked_by != worker # We don't own this job so we will update the locked_by name and the locked_at self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now]) else # We already own this job, this may happen if the job queue crashes. # Simply resume and update the locked_at self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker]) end if affected_rows == 1 self.locked_at = now self.locked_by = worker self.locked_at_will_change! self.locked_by_will_change! return true else return false end end |
#reload(*args) ⇒ Object
92 93 94 95 |
# File 'lib/delayed/backend/active_record.rb', line 92 def reload(*args) reset super end |