Class: Steve::QueuedJob

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/steve/queued_job.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.cleanup(age = 5.days.ago) ⇒ Object

Remove old completed jobs from the database



221
222
223
# File 'lib/steve/queued_job.rb', line 221

def self.cleanup(age = 5.days.ago)
  self.where("status = ? and run_at < ?", 'completed', age).delete_all
end

.execute_jobs(queue = '*', limit = 5) ⇒ Object

Execute a new job from the queue. Returns true if a job was executed, or false if a job was not found or we couldn’t obtain locks for them.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/steve/queued_job.rb', line 46

def self.execute_jobs(queue = '*', limit = 5)
  jobs = self.where(:status => ['pending', 'delayed'], :worker => nil).where(["run_at <= ?", Time.now.utc]).order("priority asc").limit(5)
  jobs = jobs.where(:queue => queue) unless queue.nil? or queue == '*'
  pending_jobs = jobs.all

  jobs_executed = Array.new
  for job in pending_jobs.sort_by { rand() }
    Steve.log "[#{job.id}] Attempt to aquire lock"
    if job.lock
      Steve.log "[#{job.id}] Lock acquired"
      if @child = fork
        rand
        Steve.log "[#{job.id}] Forked to #{@child}"
        $0 = "sj: forked to #{@child} at #{Time.now.utc.to_s(:db)}" unless Steve.keep_parent_process_name
        Process.wait
      else
        Steve.log "[#{job.id}] Executing"
        $0 = "sj: executing job ##{job.id} since #{Time.now.utc.to_s(:db)}"
        Steve.after_job_fork.call if Steve.after_job_fork.is_a?(Proc)
        job.execute
        exit!
      end
      jobs_executed << job
    else
      Steve.log "[#{job.id}] Lock could not be acquired. Moving on."
    end
  end
  jobs_executed
end

.queue(klass, params = {}, &block) ⇒ Object

Queue a new job for processing. Returns true or false depending whether the job has been queued or not.



34
35
36
37
38
39
40
41
42
# File 'lib/steve/queued_job.rb', line 34

def self.queue(klass, params = {}, &block)
  job = self.new
  job.job = klass.to_s
  job.params = params
  job.queue = klass.instance_variable_get('@queue')
  job.priority = klass.instance_variable_get('@priority')
  block.call(job) if block_given?
  job.save
end

Instance Method Details

#archive_job?Boolean

In a state appropriate to be archived?

Returns:

  • (Boolean)


212
213
214
# File 'lib/steve/queued_job.rb', line 212

def archive_job?
  ['failed', 'completed'].include?(self.status)
end

#associate_with(object) ⇒ Object

Associate this job with the pased active record object



206
207
208
209
# File 'lib/steve/queued_job.rb', line 206

def associate_with(object)
  self.associated_object = object
  self.save(:validate => false)
end

#associated_objectObject

Can belong to another active record object?



13
# File 'lib/steve/queued_job.rb', line 13

belongs_to :associated_object, :polymorphic => true

#delay!(delay_time = 30.seconds) ⇒ Object

Delay this job by the time specified



196
197
198
199
200
201
202
203
# File 'lib/steve/queued_job.rb', line 196

def delay!(delay_time = 30.seconds)
  self.status = 'delayed'
  self.run_at = Time.now.utc + delay_time
  self.started_at = nil
  self.worker = nil
  self.retries += 1
  self.save(:validate => false)
end

#executeObject

Execute this job, catching any errors if they occur and ensuring the job is started & finished as appropriate.



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/steve/queued_job.rb', line 78

def execute
  job = self.job.constantize.new(self)
  if job.respond_to?(:perform)
    start!
    STDOUT.reopen(output_file) && STDERR.reopen(output_file)
    begin
      job.perform
      success!
      Steve.log "[#{self.id}] Succeeded"
    rescue Steve::Job::Delay => e
      max_attempts = (Steve.max_job_retries || 5)
      if self.retries >= max_attempts
        fail!("#{e.message} after #{max_attempts} attempt(s)")
        Steve.log "[#{self.id}] Failed after #{max_attempts} attempt(s): #{e.message}"
      else
        self.error = e.message
        delay!
        Steve.log "[#{self.id}] Delayed ('#{e.message}')"
      end
    rescue Timeout::Error
      reconnect_database!
      fail!('Timed out')
      Steve.log "[#{self.id}] Timed out: #{e.to_s}"
    rescue => e
      if e.is_a?(Steve::Job::Error)
        fail!(e.message)
      else
        fail!([e.to_s, e.backtrace].join("\n"))
        if defined?(Airbrake)
          begin
            Timeout.timeout(5) do
              Airbrake.notify(e, :component => self.job.to_s, :action => self.id.to_s, :parameters => self.params)
            end
          rescue Timeout::Error
            reconnect_database!
            Steve.log "[#{self.id}] Timed out sending exception to Airbrake"
          end
        end
        if defined?(Raven)
          Raven.capture_exception(e, :tags => {:component => 'worker'}, :extra => {:steve => {
            :id          => self.id,
            :priority    => self.priority,
            :attempts    => self.retries,
            :run_at      => self.run_at,
            :queue       => self.queue,
            :created_at  => self.created_at,
            :params      => self.params
          }})
        end
        fail!([e.to_s, e.backtrace].join("\n"))
      end
      Steve.log "[#{self.id}] Failed: #{e.to_s}"
    end
  else
    fail! "#{self.id} did not respond to 'perform'"
    Steve.log "[#{self.id}] Failed: does not respond to 'perform'"
    return false
  end
ensure
  self.save(:validate => false)
  STDOUT.flush
  STDERR.flush

  FileUtils.rm(output_file) if File.exist?(output_file)
end

#fail!(message) ⇒ Object

Mark this job as failed.



174
175
176
177
178
# File 'lib/steve/queued_job.rb', line 174

def fail!(message)
  self.error = message
  self.status = 'failed'
  finish!
end

#finish!Object

Mark this job as finished



181
182
183
# File 'lib/steve/queued_job.rb', line 181

def finish!
  self.finished_at = Time.now.utc
end

#lockObject

Get a lock on this job. Returns true if the lock was successful otherwise it returns false.



157
158
159
160
161
162
163
164
165
# File 'lib/steve/queued_job.rb', line 157

def lock
  rows = self.class.where(:id => self.id, :worker => nil).update_all(:worker => Steve.worker_name)
  if rows == 1
    self.worker = Steve.worker_name
    return true
  else
    return false
  end
end

#outputObject

Get the output of a job, even if the job is in progress, live output won’t work very well in a multiple app-server environment



147
148
149
150
151
152
153
# File 'lib/steve/queued_job.rb', line 147

def output
  if read_attribute(:output)
    read_attribute(:output)
  elsif File.exist?(output_file)
    File.read(output_file)
  end
end

#output_fileObject



216
217
218
# File 'lib/steve/queued_job.rb', line 216

def output_file
  @output_file ||= File.join('', 'tmp', "steve-job-#{self.id}")
end

#paramsObject

Serialize the options



10
# File 'lib/steve/queued_job.rb', line 10

serialize :params

#pendingObject

Scopes



16
# File 'lib/steve/queued_job.rb', line 16

scope :pending, lambda { where(:status => ['pending', 'delayed']) }

#reconnect_database!Object



225
226
227
# File 'lib/steve/queued_job.rb', line 225

def reconnect_database!
  self.class.connection.reconnect!
end

#start!Object

Mark this job as started



186
187
188
189
190
191
192
193
# File 'lib/steve/queued_job.rb', line 186

def start!
  self.error = nil
  self.finished_at = nil
  self.started_at = Time.now.utc
  self.status = 'running'
  self.job_pid = Process.pid
  self.save(:validate => false)
end

#success!Object

Mark this job as succeeded successfully.



168
169
170
171
# File 'lib/steve/queued_job.rb', line 168

def success!
  self.status = 'completed'
  finish!
end