Class: Resque::Job
- Inherits:
-
Object
- Object
- Resque::Job
- Extended by:
- Helpers
- Includes:
- Helpers
- Defined in:
- lib/resque/job.rb
Overview
A Resque::Job represents a unit of work. Each job lives on a single queue and has an associated payload object. The payload is a hash with two attributes: ‘class` and `args`. The `class` is the name of the Ruby class which should be used to run the job. The `args` are an array of arguments which should be passed to the Ruby class’s ‘perform` class-level method.
You can manually run a job using this code:
job = Resque::Job.reserve(:high)
klass = Resque::Job.constantize(job.payload['class'])
klass.perform(*job.payload['args'])
Constant Summary collapse
- DontPerform =
Raise Resque::Job::DontPerform from a before_perform hook to abort the job.
Class.new(StandardError)
Instance Attribute Summary collapse
-
#payload ⇒ Object
readonly
This job’s associated payload object.
-
#queue ⇒ Object
readonly
The name of the queue from which this job was pulled (or is to be placed).
-
#worker ⇒ Object
The worker object which is currently processing this job.
Class Method Summary collapse
-
.create(queue, klass, *args) ⇒ Object
Creates a job by placing it on a queue.
-
.destroy(queue, klass, *args) ⇒ Object
Removes a job from a queue.
-
.reserve(queue) ⇒ Object
Given a string queue name, returns an instance of Resque::Job if any jobs are available.
Instance Method Summary collapse
-
#==(other) ⇒ Object
Equality.
- #after_hooks ⇒ Object
-
#args ⇒ Object
Returns an array of args represented in this job’s payload.
- #around_hooks ⇒ Object
- #before_hooks ⇒ Object
-
#fail(exception) ⇒ Object
Given an exception object, hands off the needed parameters to the Failure module.
- #failure_hooks ⇒ Object
-
#initialize(queue, payload) ⇒ Job
constructor
A new instance of Job.
-
#inspect ⇒ Object
String representation.
-
#payload_class ⇒ Object
Returns the actual class constant represented in this job’s payload.
-
#perform ⇒ Object
Attempts to perform the work represented by this job instance.
-
#recreate ⇒ Object
Creates an identical job, essentially placing this job back on the queue.
- #run_failure_hooks(exception) ⇒ Object
Methods included from Helpers
classify, constantize, decode, encode, mongo_stats, mongo_workers
Constructor Details
#initialize(queue, payload) ⇒ Job
Returns a new instance of Job.
32 33 34 35 |
# File 'lib/resque/job.rb', line 32 def initialize(queue, payload) @queue = queue @payload = payload end |
Instance Attribute Details
#payload ⇒ Object (readonly)
This job’s associated payload object.
30 31 32 |
# File 'lib/resque/job.rb', line 30 def payload @payload end |
#queue ⇒ Object (readonly)
The name of the queue from which this job was pulled (or is to be placed)
27 28 29 |
# File 'lib/resque/job.rb', line 27 def queue @queue end |
#worker ⇒ Object
The worker object which is currently processing this job.
23 24 25 |
# File 'lib/resque/job.rb', line 23 def worker @worker end |
Class Method Details
.create(queue, klass, *args) ⇒ Object
Creates a job by placing it on a queue. Expects a string queue name, a string class name, and an optional array of arguments to pass to the class’ ‘perform` method.
Raises an exception if no queue or class is given.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/resque/job.rb', line 42 def self.create(queue, klass, *args) Resque.validate(klass, queue) item = { :class => klass.to_s, :args => args} #are we trying to put a non-delayed job into a delayed queue? if Resque.delayed_queue?(queue) if Resque.delayed_job?(klass) if args[0].is_a?(Hash) && args[0].has_key?(:delay_until) item[:delay_until] = args[0][:delay_until] else raise QueueError.new 'trying to insert delayed job without delay_until' end else raise QueueError.new 'trying to insert non-delayed job into delayed queue' end else if Resque.delayed_job?(klass) raise QueueError.new 'trying to insert a delayed job into a non-delayed queue' end end if Resque.inline? constantize(klass).perform(*decode(encode(args))) else Resque.push(queue, item) end end |
.destroy(queue, klass, *args) ⇒ Object
Removes a job from a queue. Expects a string queue name, a string class name, and, optionally, args.
Returns the number of jobs destroyed.
If no args are provided, it will remove all jobs of the class provided.
That is, for these two jobs:
{ ‘class’ => ‘UpdateGraph’, ‘args’ => [‘defunkt’] } { ‘class’ => ‘UpdateGraph’, ‘args’ => [‘mojombo’] }
The following call will remove both:
Resque::Job.destroy(queue, 'UpdateGraph')
Whereas specifying args will only remove the 2nd job:
Resque::Job.destroy(queue, 'UpdateGraph', 'mojombo')
This method can be potentially very slow and memory intensive, depending on the size of your queue, as it loads all jobs into a Ruby array before processing.
95 96 97 98 99 100 101 102 |
# File 'lib/resque/job.rb', line 95 def self.destroy(queue, klass, *args) collection = Resque.collection_for_queue(queue) selector = {'class' => klass.to_s} selector['args'] = args unless args.empty? destroyed = collection.find(selector).count collection.remove(selector, :safe => true) destroyed end |
Instance Method Details
#==(other) ⇒ Object
Equality
205 206 207 208 209 |
# File 'lib/resque/job.rb', line 205 def ==(other) queue == other.queue && payload_class == other.payload_class && args == other.args end |
#after_hooks ⇒ Object
219 220 221 |
# File 'lib/resque/job.rb', line 219 def after_hooks @after_hooks ||= Plugin.after_hooks(payload_class) end |
#args ⇒ Object
Returns an array of args represented in this job’s payload.
177 178 179 |
# File 'lib/resque/job.rb', line 177 def args @payload['args'] end |
#around_hooks ⇒ Object
215 216 217 |
# File 'lib/resque/job.rb', line 215 def around_hooks @around_hooks ||= Plugin.around_hooks(payload_class) end |
#before_hooks ⇒ Object
211 212 213 |
# File 'lib/resque/job.rb', line 211 def before_hooks @before_hooks ||= Plugin.before_hooks(payload_class) end |
#fail(exception) ⇒ Object
Given an exception object, hands off the needed parameters to the Failure module.
183 184 185 186 187 188 189 190 |
# File 'lib/resque/job.rb', line 183 def fail(exception) run_failure_hooks(exception) Failure.create \ :payload => payload, :exception => exception, :worker => worker, :queue => queue end |
#failure_hooks ⇒ Object
223 224 225 |
# File 'lib/resque/job.rb', line 223 def failure_hooks @failure_hooks ||= Plugin.failure_hooks(payload_class) end |
#inspect ⇒ Object
String representation
199 200 201 202 |
# File 'lib/resque/job.rb', line 199 def inspect obj = @payload "(Job{%s} | %s | %s)" % [ @queue, obj['class'], obj['args'].inspect ] end |
#payload_class ⇒ Object
Returns the actual class constant represented in this job’s payload.
172 173 174 |
# File 'lib/resque/job.rb', line 172 def payload_class @payload_class ||= constantize(@payload['class']) end |
#perform ⇒ Object
Attempts to perform the work represented by this job instance. Calls #perform on the class given in the payload with the arguments given in the payload.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/resque/job.rb', line 114 def perform job = payload_class job_args = args || [] job_was_performed = false begin # Execute before_perform hook. Abort the job gracefully if # Resque::DontPerform is raised. begin before_hooks.each do |hook| job.send(hook, *job_args) end rescue DontPerform return false end # Execute the job. Do it in an around_perform hook if available. if around_hooks.empty? job.perform(*job_args) job_was_performed = true else # We want to nest all around_perform plugins, with the last one # finally calling perform stack = around_hooks.reverse.inject(nil) do |last_hook, hook| if last_hook lambda do job.send(hook, *job_args) { last_hook.call } end else lambda do job.send(hook, *job_args) do result = job.perform(*job_args) job_was_performed = true result end end end end stack.call end # Execute after_perform hook after_hooks.each do |hook| job.send(hook, *job_args) end # Return true if the job was performed return job_was_performed # If an exception occurs during the job execution, look for an # on_failure hook then re-raise. rescue Object => e run_failure_hooks(e) raise e end end |
#recreate ⇒ Object
Creates an identical job, essentially placing this job back on the queue.
194 195 196 |
# File 'lib/resque/job.rb', line 194 def recreate self.class.create(queue, payload_class, *args) end |
#run_failure_hooks(exception) ⇒ Object
227 228 229 230 |
# File 'lib/resque/job.rb', line 227 def run_failure_hooks(exception) job_args = args || [] failure_hooks.each { |hook| payload_class.send(hook, exception, *job_args) } end |