Class: Steve::QueuedJob
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Steve::QueuedJob
- Defined in:
- lib/steve/queued_job.rb
Class Method Summary collapse
-
.cleanup(age = 5.days.ago) ⇒ Object
Remove old completed jobs from the database.
-
.execute_jobs(queue = '*', limit = 5) ⇒ Object
Execute a new job from the queue.
-
.queue(klass, params = {}, &block) ⇒ Object
Queue a new job for processing.
Instance Method Summary collapse
-
#associate_with(object) ⇒ Object
Associate this job with the pased active record object.
-
#associated_object ⇒ Object
Can belong to another active record object?.
-
#delay!(delay_time = 30.seconds) ⇒ Object
Delay this job by the time specified.
-
#execute ⇒ Object
Execute this job, catching any errors if they occur and ensuring the job is started & finished as appropriate.
-
#fail!(message) ⇒ Object
Mark this job as failed.
-
#finish! ⇒ Object
Mark this job as finished.
-
#lock ⇒ Object
Get a lock on this job.
-
#params ⇒ Object
Serialize the options.
-
#pending ⇒ Object
Scopes.
-
#start! ⇒ Object
Mark this job as started.
-
#success! ⇒ Object
Mark this job as succeeded successfully.
Class Method Details
.cleanup(age = 5.days.ago) ⇒ Object
Remove old completed jobs from the database
187 188 189 |
# File 'lib/steve/queued_job.rb', line 187 def self.cleanup(age = 5.days.ago) self.delete_all(["status = 'completed' and run_at < ?", age]) end |
.execute_jobs(queue = '*', limit = 5) ⇒ Object
Execute a new job from the queue. Returns true if a job was executed, or false if a job was not found or we couldn’t obtain locks for them.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/steve/queued_job.rb', line 42 def self.execute_jobs(queue = '*', limit = 5) pending_jobs = ActiveRecord::Base.silence do jobs = self.where(:status => ['pending', 'delayed'], :worker => nil).where(["run_at <= ?", Time.now.utc]).order("priority asc").limit(5) jobs = jobs.where(:queue => queue) unless queue.nil? or queue == '*' jobs.all end jobs_executed = Array.new for job in pending_jobs.sort_by { rand() } Steve.log "[#{job.id}] Attempt to aquire lock" if job.lock Steve.log "[#{job.id}] Lock acquired" ActiveRecord::Base.remove_connection if @child = fork rand Steve.log "[#{job.id}] Forked to #{@child}" $0 = "sj: forked to #{@child} at #{Time.now.utc.to_s(:db)}" unless Steve.keep_parent_process_name ActiveRecord::Base.establish_connection Process.wait else Steve.log "[#{job.id}] Executing" $0 = "sj: executing job ##{job.id} since #{Time.now.utc.to_s(:db)}" ActiveRecord::Base.establish_connection Steve.after_job_fork.call if Steve.after_job_fork.is_a?(Proc) job.execute exit end jobs_executed << job else Steve.log "[#{job.id}] Lock could not be acquired. Moving on." end end jobs_executed end |
.queue(klass, params = {}, &block) ⇒ Object
Queue a new job for processing. Returns true or false depending whether the job has been queued or not.
31 32 33 34 35 36 37 38 |
# File 'lib/steve/queued_job.rb', line 31 def self.queue(klass, params = {}, &block) job = self.new job.job = klass.to_s job.params = params job.queue = klass.instance_variable_get('@queue') block.call(job) if block_given? job.save end |
Instance Method Details
#associate_with(object) ⇒ Object
Associate this job with the pased active record object
181 182 183 184 |
# File 'lib/steve/queued_job.rb', line 181 def associate_with(object) self.associated_object = object self.save(:validate => false) end |
#associated_object ⇒ Object
Can belong to another active record object?
13 |
# File 'lib/steve/queued_job.rb', line 13 belongs_to :associated_object, :polymorphic => true |
#delay!(delay_time = 30.seconds) ⇒ Object
Delay this job by the time specified
171 172 173 174 175 176 177 178 |
# File 'lib/steve/queued_job.rb', line 171 def delay!(delay_time = 30.seconds) self.status = 'delayed' self.run_at = Time.now.utc + delay_time self.started_at = nil self.worker = nil self.retries += 1 self.save(:validate => false) end |
#execute ⇒ Object
Execute this job, catching any errors if they occur and ensuring the job is started & finished as appropriate.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/steve/queued_job.rb', line 79 def execute @output_file = File.join('', 'tmp', "steve-job-#{self.id}") job = self.job.constantize.new(self) if job.respond_to?(:perform) start! STDOUT.reopen(@output_file) && STDERR.reopen(@output_file) begin job.perform success! Steve.log "[#{self.id}] Succeeded" rescue Steve::Job::Delay => e max_attempts = (Steve.max_job_retries || 5) if self.retries >= max_attempts fail!("#{e.} after #{max_attempts} attempt(s)") Steve.log "[#{self.id}] Failed after #{max_attempts} attempt(s): #{e.}" else self.error = e. delay! Steve.log "[#{self.id}] Delayed ('#{e.}')" end rescue Timeout::Error fail!('Timed out') Steve.log "[#{self.id}] Timed out: #{e.to_s}" rescue => e if e.is_a?(Steve::Job::Error) fail!(e.) else if defined?(Airbrake) Airbrake.notify(e, :component => self.job.to_s, :action => self.id.to_s, :parameters => self.params) end fail!([e.to_s, e.backtrace].join("\n")) end Steve.log "[#{self.id}] Failed: #{e.to_s}" end else fail! "#{self.id} did not respond to 'perform'" Steve.log "[#{self.id}] Failed: does not respond to 'perform'" return false end ensure STDOUT.flush && STDERR.flush if File.exist?(@output_file) self.output = File.read(@output_file) FileUtils.rm(@output_file) end if self.status == 'completed' && Steve.delete_successful_jobs self.destroy else self.save(:validate => false) end end |
#fail!(message) ⇒ Object
Mark this job as failed.
150 151 152 153 154 |
# File 'lib/steve/queued_job.rb', line 150 def fail!() self.error = self.status = 'failed' finish! end |
#finish! ⇒ Object
Mark this job as finished
157 158 159 |
# File 'lib/steve/queued_job.rb', line 157 def finish! self.finished_at = Time.now.utc end |
#lock ⇒ Object
Get a lock on this job. Returns true if the lock was successful otherwise it returns false.
133 134 135 136 137 138 139 140 141 |
# File 'lib/steve/queued_job.rb', line 133 def lock rows = self.class.update_all({:worker => Steve.worker_name}, {:id => self.id, :worker => nil}) if rows == 1 self.worker = Steve.worker_name return true else return false end end |
#params ⇒ Object
Serialize the options
10 |
# File 'lib/steve/queued_job.rb', line 10 serialize :params |
#pending ⇒ Object
Scopes
16 |
# File 'lib/steve/queued_job.rb', line 16 scope :pending, where(:status => ['pending', 'delayed']) |
#start! ⇒ Object
Mark this job as started
162 163 164 165 166 167 168 |
# File 'lib/steve/queued_job.rb', line 162 def start! self.error = nil self.finished_at = nil self.started_at = Time.now.utc self.status = 'running' self.save(:validate => false) end |
#success! ⇒ Object
Mark this job as succeeded successfully.
144 145 146 147 |
# File 'lib/steve/queued_job.rb', line 144 def success! self.status = 'completed' finish! end |