Class: Steve::QueuedJob

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/steve/queued_job.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.cleanup(age = 5.days.ago) ⇒ Object

Remove old completed jobs from the database



187
188
189
# File 'lib/steve/queued_job.rb', line 187

def self.cleanup(age = 5.days.ago)
  self.delete_all(["status = 'completed' and run_at < ?", age])
end

.execute_jobs(queue = '*', limit = 5) ⇒ Object

Execute a new job from the queue. Returns true if a job was executed, or false if a job was not found or we couldn’t obtain locks for them.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/steve/queued_job.rb', line 42

def self.execute_jobs(queue = '*', limit = 5)
  pending_jobs = ActiveRecord::Base.silence do
    jobs = self.where(:status => ['pending', 'delayed'], :worker => nil).where(["run_at <= ?", Time.now.utc]).order("priority asc").limit(5)
    jobs = jobs.where(:queue => queue) unless queue.nil? or queue == '*'
    jobs.all
  end

  jobs_executed = Array.new
  for job in pending_jobs.sort_by { rand() }
    Steve.log "[#{job.id}] Attempt to aquire lock"
    if job.lock 
      Steve.log "[#{job.id}] Lock acquired"
      ActiveRecord::Base.remove_connection
      if @child = fork
        rand
        Steve.log "[#{job.id}] Forked to #{@child}"
        $0 = "sj: forked to #{@child} at #{Time.now.utc.to_s(:db)}" unless Steve.keep_parent_process_name
        ActiveRecord::Base.establish_connection
        Process.wait
      else
        Steve.log "[#{job.id}] Executing"
        $0 = "sj: executing job ##{job.id} since #{Time.now.utc.to_s(:db)}"
        ActiveRecord::Base.establish_connection
        Steve.after_job_fork.call if Steve.after_job_fork.is_a?(Proc)
        job.execute
        exit
      end
      jobs_executed << job
    else
      Steve.log "[#{job.id}] Lock could not be acquired. Moving on."
    end
  end
  jobs_executed
end

.queue(klass, params = {}, &block) ⇒ Object

Queue a new job for processing. Returns true or false depending whether the job has been queued or not.



31
32
33
34
35
36
37
38
# File 'lib/steve/queued_job.rb', line 31

def self.queue(klass, params = {}, &block)
  job = self.new
  job.job = klass.to_s
  job.params = params
  job.queue = klass.instance_variable_get('@queue')
  block.call(job) if block_given?
  job.save
end

Instance Method Details

#associate_with(object) ⇒ Object

Associate this job with the pased active record object



181
182
183
184
# File 'lib/steve/queued_job.rb', line 181

def associate_with(object)
  self.associated_object = object
  self.save(:validate => false)
end

#associated_objectObject

Can belong to another active record object?



13
# File 'lib/steve/queued_job.rb', line 13

belongs_to :associated_object, :polymorphic => true

#delay!(delay_time = 30.seconds) ⇒ Object

Delay this job by the time specified



171
172
173
174
175
176
177
178
# File 'lib/steve/queued_job.rb', line 171

def delay!(delay_time = 30.seconds)
  self.status = 'delayed'
  self.run_at = Time.now.utc + delay_time
  self.started_at = nil
  self.worker = nil
  self.retries += 1
  self.save(:validate => false)
end

#executeObject

Execute this job, catching any errors if they occur and ensuring the job is started & finished as appropriate.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/steve/queued_job.rb', line 79

def execute
  @output_file = File.join('', 'tmp', "steve-job-#{self.id}")
  job = self.job.constantize.new(self)
  if job.respond_to?(:perform)
    start!
    STDOUT.reopen(@output_file) && STDERR.reopen(@output_file)
    begin
      job.perform
      success!
      Steve.log "[#{self.id}] Succeeded"
    rescue Steve::Job::Delay => e
      max_attempts = (Steve.max_job_retries || 5)
      if self.retries >= max_attempts
        fail!("#{e.message} after #{max_attempts} attempt(s)")
        Steve.log "[#{self.id}] Failed after #{max_attempts} attempt(s): #{e.message}"
      else
        self.error = e.message
        delay!
        Steve.log "[#{self.id}] Delayed ('#{e.message}')"            
      end
    rescue Timeout::Error
      fail!('Timed out')
      Steve.log "[#{self.id}] Timed out: #{e.to_s}"
    rescue => e
      if e.is_a?(Steve::Job::Error)
        fail!(e.message)
      else
        if defined?(Airbrake)
          Airbrake.notify(e, :component => self.job.to_s, :action => self.id.to_s, :parameters => self.params)
        end
        fail!([e.to_s, e.backtrace].join("\n"))
      end
      Steve.log "[#{self.id}] Failed: #{e.to_s}"
    end
  else
    fail! "#{self.id} did not respond to 'perform'"
    Steve.log "[#{self.id}] Failed: does not respond to 'perform'"
    return false
  end
ensure
  STDOUT.flush && STDERR.flush
  if File.exist?(@output_file)
    self.output = File.read(@output_file)
    FileUtils.rm(@output_file)
  end
  if self.status == 'completed' && Steve.delete_successful_jobs
    self.destroy
  else
    self.save(:validate => false)
  end
end

#fail!(message) ⇒ Object

Mark this job as failed.



150
151
152
153
154
# File 'lib/steve/queued_job.rb', line 150

def fail!(message)
  self.error = message
  self.status = 'failed'
  finish!
end

#finish!Object

Mark this job as finished



157
158
159
# File 'lib/steve/queued_job.rb', line 157

def finish!
  self.finished_at = Time.now.utc
end

#lockObject

Get a lock on this job. Returns true if the lock was successful otherwise it returns false.



133
134
135
136
137
138
139
140
141
# File 'lib/steve/queued_job.rb', line 133

def lock
  rows = self.class.update_all({:worker => Steve.worker_name}, {:id => self.id, :worker => nil})
  if rows == 1
    self.worker = Steve.worker_name
    return true
  else
    return false
  end
end

#paramsObject

Serialize the options



10
# File 'lib/steve/queued_job.rb', line 10

serialize :params

#pendingObject

Scopes



16
# File 'lib/steve/queued_job.rb', line 16

scope :pending, where(:status => ['pending', 'delayed'])

#start!Object

Mark this job as started



162
163
164
165
166
167
168
# File 'lib/steve/queued_job.rb', line 162

def start!
  self.error = nil
  self.finished_at = nil
  self.started_at = Time.now.utc
  self.status = 'running'
  self.save(:validate => false)
end

#success!Object

Mark this job as succeeded successfully.



144
145
146
147
# File 'lib/steve/queued_job.rb', line 144

def success!
  self.status = 'completed'
  finish!
end