Class: Steve::QueuedJob
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Steve::QueuedJob
- Defined in:
- lib/steve/queued_job.rb
Class Method Summary collapse
-
.cleanup(age = 5.days.ago) ⇒ Object
Remove old completed jobs from the database.
-
.execute_jobs(queue = '*', limit = 5) ⇒ Object
Execute a new job from the queue.
-
.queue(klass, params = {}, &block) ⇒ Object
Queue a new job for processing.
Instance Method Summary collapse
-
#archive_job? ⇒ Boolean
In a state appropriate to be archived?.
-
#associate_with(object) ⇒ Object
Associate this job with the pased active record object.
-
#associated_object ⇒ Object
Can belong to another active record object?.
-
#delay!(delay_time = 30.seconds) ⇒ Object
Delay this job by the time specified.
-
#execute ⇒ Object
Execute this job, catching any errors if they occur and ensuring the job is started & finished as appropriate.
-
#fail!(message) ⇒ Object
Mark this job as failed.
-
#finish! ⇒ Object
Mark this job as finished.
-
#lock ⇒ Object
Get a lock on this job.
-
#output ⇒ Object
Get the output of a job, even if the job is in progress, live output won’t work very well in a multiple app-server environment.
- #output_file ⇒ Object
-
#params ⇒ Object
Serialize the options.
-
#pending ⇒ Object
Scopes.
- #reconnect_database! ⇒ Object
-
#start! ⇒ Object
Mark this job as started.
-
#success! ⇒ Object
Mark this job as succeeded successfully.
Class Method Details
.cleanup(age = 5.days.ago) ⇒ Object
Remove old completed jobs from the database
221 222 223 |
# File 'lib/steve/queued_job.rb', line 221 def self.cleanup(age = 5.days.ago) self.where("status = ? and run_at < ?", 'completed', age).delete_all end |
.execute_jobs(queue = '*', limit = 5) ⇒ Object
Execute a new job from the queue. Returns true if a job was executed, or false if a job was not found or we couldn’t obtain locks for them.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/steve/queued_job.rb', line 46 def self.execute_jobs(queue = '*', limit = 5) jobs = self.where(:status => ['pending', 'delayed'], :worker => nil).where(["run_at <= ?", Time.now.utc]).order("priority asc").limit(5) jobs = jobs.where(:queue => queue) unless queue.nil? or queue == '*' pending_jobs = jobs.all jobs_executed = Array.new for job in pending_jobs.sort_by { rand() } Steve.log "[#{job.id}] Attempt to aquire lock" if job.lock Steve.log "[#{job.id}] Lock acquired" if @child = fork rand Steve.log "[#{job.id}] Forked to #{@child}" $0 = "sj: forked to #{@child} at #{Time.now.utc.to_s(:db)}" unless Steve.keep_parent_process_name Process.wait else Steve.log "[#{job.id}] Executing" $0 = "sj: executing job ##{job.id} since #{Time.now.utc.to_s(:db)}" Steve.after_job_fork.call if Steve.after_job_fork.is_a?(Proc) job.execute exit! end jobs_executed << job else Steve.log "[#{job.id}] Lock could not be acquired. Moving on." end end jobs_executed end |
.queue(klass, params = {}, &block) ⇒ Object
Queue a new job for processing. Returns true or false depending whether the job has been queued or not.
34 35 36 37 38 39 40 41 42 |
# File 'lib/steve/queued_job.rb', line 34 def self.queue(klass, params = {}, &block) job = self.new job.job = klass.to_s job.params = params job.queue = klass.instance_variable_get('@queue') job.priority = klass.instance_variable_get('@priority') block.call(job) if block_given? job.save end |
Instance Method Details
#archive_job? ⇒ Boolean
In a state appropriate to be archived?
212 213 214 |
# File 'lib/steve/queued_job.rb', line 212 def archive_job? ['failed', 'completed'].include?(self.status) end |
#associate_with(object) ⇒ Object
Associate this job with the pased active record object
206 207 208 209 |
# File 'lib/steve/queued_job.rb', line 206 def associate_with(object) self.associated_object = object self.save(:validate => false) end |
#associated_object ⇒ Object
Can belong to another active record object?
13 |
# File 'lib/steve/queued_job.rb', line 13 belongs_to :associated_object, :polymorphic => true |
#delay!(delay_time = 30.seconds) ⇒ Object
Delay this job by the time specified
196 197 198 199 200 201 202 203 |
# File 'lib/steve/queued_job.rb', line 196 def delay!(delay_time = 30.seconds) self.status = 'delayed' self.run_at = Time.now.utc + delay_time self.started_at = nil self.worker = nil self.retries += 1 self.save(:validate => false) end |
#execute ⇒ Object
Execute this job, catching any errors if they occur and ensuring the job is started & finished as appropriate.
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/steve/queued_job.rb', line 78 def execute job = self.job.constantize.new(self) if job.respond_to?(:perform) start! STDOUT.reopen(output_file) && STDERR.reopen(output_file) begin job.perform success! Steve.log "[#{self.id}] Succeeded" rescue Steve::Job::Delay => e max_attempts = (Steve.max_job_retries || 5) if self.retries >= max_attempts fail!("#{e.} after #{max_attempts} attempt(s)") Steve.log "[#{self.id}] Failed after #{max_attempts} attempt(s): #{e.}" else self.error = e. delay! Steve.log "[#{self.id}] Delayed ('#{e.}')" end rescue Timeout::Error reconnect_database! fail!('Timed out') Steve.log "[#{self.id}] Timed out: #{e.to_s}" rescue => e if e.is_a?(Steve::Job::Error) fail!(e.) else fail!([e.to_s, e.backtrace].join("\n")) if defined?(Airbrake) begin Timeout.timeout(5) do Airbrake.notify(e, :component => self.job.to_s, :action => self.id.to_s, :parameters => self.params) end rescue Timeout::Error reconnect_database! Steve.log "[#{self.id}] Timed out sending exception to Airbrake" end end if defined?(Raven) Raven.capture_exception(e, :tags => {:component => 'worker'}, :extra => {:steve => { :id => self.id, :priority => self.priority, :attempts => self.retries, :run_at => self.run_at, :queue => self.queue, :created_at => self.created_at, :params => self.params }}) end fail!([e.to_s, e.backtrace].join("\n")) end Steve.log "[#{self.id}] Failed: #{e.to_s}" end else fail! "#{self.id} did not respond to 'perform'" Steve.log "[#{self.id}] Failed: does not respond to 'perform'" return false end ensure self.save(:validate => false) STDOUT.flush STDERR.flush FileUtils.rm(output_file) if File.exist?(output_file) end |
#fail!(message) ⇒ Object
Mark this job as failed.
174 175 176 177 178 |
# File 'lib/steve/queued_job.rb', line 174 def fail!() self.error = self.status = 'failed' finish! end |
#finish! ⇒ Object
Mark this job as finished
181 182 183 |
# File 'lib/steve/queued_job.rb', line 181 def finish! self.finished_at = Time.now.utc end |
#lock ⇒ Object
Get a lock on this job. Returns true if the lock was successful otherwise it returns false.
157 158 159 160 161 162 163 164 165 |
# File 'lib/steve/queued_job.rb', line 157 def lock rows = self.class.where(:id => self.id, :worker => nil).update_all(:worker => Steve.worker_name) if rows == 1 self.worker = Steve.worker_name return true else return false end end |
#output ⇒ Object
Get the output of a job, even if the job is in progress, live output won’t work very well in a multiple app-server environment
147 148 149 150 151 152 153 |
# File 'lib/steve/queued_job.rb', line 147 def output if read_attribute(:output) read_attribute(:output) elsif File.exist?(output_file) File.read(output_file) end end |
#output_file ⇒ Object
216 217 218 |
# File 'lib/steve/queued_job.rb', line 216 def output_file @output_file ||= File.join('', 'tmp', "steve-job-#{self.id}") end |
#params ⇒ Object
Serialize the options
10 |
# File 'lib/steve/queued_job.rb', line 10 serialize :params |
#pending ⇒ Object
Scopes
16 |
# File 'lib/steve/queued_job.rb', line 16 scope :pending, lambda { where(:status => ['pending', 'delayed']) } |
#reconnect_database! ⇒ Object
225 226 227 |
# File 'lib/steve/queued_job.rb', line 225 def reconnect_database! self.class.connection.reconnect! end |
#start! ⇒ Object
Mark this job as started
186 187 188 189 190 191 192 193 |
# File 'lib/steve/queued_job.rb', line 186 def start! self.error = nil self.finished_at = nil self.started_at = Time.now.utc self.status = 'running' self.job_pid = Process.pid self.save(:validate => false) end |
#success! ⇒ Object
Mark this job as succeeded successfully.
168 169 170 171 |
# File 'lib/steve/queued_job.rb', line 168 def success! self.status = 'completed' finish! end |