Class: ResqueSqs::Job
- Inherits:
-
Object
- Object
- ResqueSqs::Job
- Defined in:
- lib/resque_sqs/job.rb
Overview
A ResqueSqs::Job represents a unit of work. Each job lives on a single queue and has an associated payload object. The payload is a hash with two attributes: ‘class` and `args`. The `class` is the name of the Ruby class which should be used to run the job. The `args` are an array of arguments which should be passed to the Ruby class’s ‘perform` class-level method.
You can manually run a job using this code:
job = ResqueSqs::Job.reserve(:high)
klass = ResqueSqs::Job.constantize(job.payload['class'])
klass.perform(*job.payload['args'])
Constant Summary collapse
- DontPerform =
Raise ResqueSqs::Job::DontPerform from a before_perform hook to abort the job.
Class.new(StandardError)
Instance Attribute Summary collapse
-
#payload ⇒ Object
readonly
This job’s associated payload object.
-
#queue ⇒ Object
readonly
The name of the queue from which this job was pulled (or is to be placed).
-
#worker ⇒ Object
The worker object which is currently processing this job.
Class Method Summary collapse
-
.create(queue, klass, *args) ⇒ Object
Creates a job by placing it on a queue.
-
.decode(object) ⇒ Object
Given a string, returns a Ruby object.
-
.destroy(queue, klass, *args) ⇒ Object
Removes a job from a queue.
-
.encode(object) ⇒ Object
Given a Ruby object, returns a string suitable for storage in a queue.
- .redis ⇒ Object
-
.reserve(queue) ⇒ Object
Given a string queue name, returns an instance of ResqueSqs::Job if any jobs are available.
Instance Method Summary collapse
-
#==(other) ⇒ Object
Equality.
- #after_hooks ⇒ Object
-
#args ⇒ Object
Returns an array of args represented in this job’s payload.
- #around_hooks ⇒ Object
- #before_hooks ⇒ Object
-
#classify(dashed_word) ⇒ Object
Given a word with dashes, returns a camel cased version of it.
-
#constantize(camel_cased_word) ⇒ Object
Tries to find a constant with the name specified in the argument string:.
-
#decode(object) ⇒ Object
Given a string, returns a Ruby object.
-
#encode(object) ⇒ Object
Given a Ruby object, returns a string suitable for storage in a queue.
-
#fail(exception) ⇒ Object
Given an exception object, hands off the needed parameters to the Failure module.
- #failure_hooks ⇒ Object
- #has_payload_class? ⇒ Boolean
-
#initialize(queue, payload) ⇒ Job
constructor
A new instance of Job.
-
#inspect ⇒ Object
String representation.
-
#payload_class ⇒ Object
Returns the actual class constant represented in this job’s payload.
-
#payload_class_name ⇒ Object
Returns the payload class as a string without raising NameError.
-
#perform ⇒ Object
Attempts to perform the work represented by this job instance.
-
#recreate ⇒ Object
Creates an identical job, essentially placing this job back on the queue.
- #redis ⇒ Object
- #run_failure_hooks(exception) ⇒ Object
Constructor Details
#initialize(queue, payload) ⇒ Job
Returns a new instance of Job.
134 135 136 137 138 |
# File 'lib/resque_sqs/job.rb', line 134 def initialize(queue, payload) @queue = queue @payload = payload @failure_hooks_ran = false end |
Instance Attribute Details
#payload ⇒ Object (readonly)
This job’s associated payload object.
132 133 134 |
# File 'lib/resque_sqs/job.rb', line 132 def payload @payload end |
#queue ⇒ Object (readonly)
The name of the queue from which this job was pulled (or is to be placed)
129 130 131 |
# File 'lib/resque_sqs/job.rb', line 129 def queue @queue end |
#worker ⇒ Object
The worker object which is currently processing this job.
125 126 127 |
# File 'lib/resque_sqs/job.rb', line 125 def worker @worker end |
Class Method Details
.create(queue, klass, *args) ⇒ Object
Creates a job by placing it on a queue. Expects a string queue name, a string class name, and an optional array of arguments to pass to the class’ ‘perform` method.
Raises an exception if no queue or class is given.
145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/resque_sqs/job.rb', line 145 def self.create(queue, klass, *args) ResqueSqs.validate(klass, queue) if ResqueSqs.inline? # Instantiating a ResqueSqs::Job and calling perform on it so callbacks run # decode(encode(args)) to ensure that args are normalized in the same manner as a non-inline job new(:inline, {'class' => klass, 'args' => decode(encode(args))}).perform else ResqueSqs.push(queue, :class => klass.to_s, :args => args) end end |
.decode(object) ⇒ Object
Given a string, returns a Ruby object.
59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/resque_sqs/job.rb', line 59 def self.decode(object) return unless object begin if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load) MultiJson.load object else MultiJson.decode object end rescue ::MultiJson::DecodeError => e raise DecodeException, e., e.backtrace end end |
.destroy(queue, klass, *args) ⇒ Object
Removes a job from a queue. Expects a string queue name, a string class name, and, optionally, args.
Returns the number of jobs destroyed.
If no args are provided, it will remove all jobs of the class provided.
That is, for these two jobs:
{ ‘class’ => ‘UpdateGraph’, ‘args’ => [‘defunkt’] } { ‘class’ => ‘UpdateGraph’, ‘args’ => [‘mojombo’] }
The following call will remove both:
ResqueSqs::Job.destroy(queue, 'UpdateGraph')
Whereas specifying args will only remove the 2nd job:
ResqueSqs::Job.destroy(queue, 'UpdateGraph', 'mojombo')
This method can be potentially very slow and memory intensive, depending on the size of your queue, as it loads all jobs into a Ruby array before processing.
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/resque_sqs/job.rb', line 181 def self.destroy(queue, klass, *args) klass = klass.to_s queue = "queue:#{queue}" destroyed = 0 if args.empty? redis.lrange(queue, 0, -1).each do |string| if decode(string)['class'] == klass destroyed += redis.lrem(queue, 0, string).to_i end end else destroyed += redis.lrem(queue, 0, encode(:class => klass, :args => args)) end destroyed end |
.encode(object) ⇒ Object
Given a Ruby object, returns a string suitable for storage in a queue.
50 51 52 53 54 55 56 |
# File 'lib/resque_sqs/job.rb', line 50 def self.encode(object) if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load) MultiJson.dump object else MultiJson.encode object end end |
Instance Method Details
#==(other) ⇒ Object
Equality
313 314 315 316 317 |
# File 'lib/resque_sqs/job.rb', line 313 def ==(other) queue == other.queue && payload_class == other.payload_class && args == other.args end |
#after_hooks ⇒ Object
327 328 329 |
# File 'lib/resque_sqs/job.rb', line 327 def after_hooks @after_hooks ||= Plugin.after_hooks(payload_class) end |
#args ⇒ Object
Returns an array of args represented in this job’s payload.
285 286 287 |
# File 'lib/resque_sqs/job.rb', line 285 def args @payload['args'] end |
#around_hooks ⇒ Object
323 324 325 |
# File 'lib/resque_sqs/job.rb', line 323 def around_hooks @around_hooks ||= Plugin.around_hooks(payload_class) end |
#before_hooks ⇒ Object
319 320 321 |
# File 'lib/resque_sqs/job.rb', line 319 def before_hooks @before_hooks ||= Plugin.before_hooks(payload_class) end |
#classify(dashed_word) ⇒ Object
Given a word with dashes, returns a camel cased version of it.
classify(‘job-name’) # => ‘JobName’
76 77 78 |
# File 'lib/resque_sqs/job.rb', line 76 def classify(dashed_word) dashed_word.split('-').each { |part| part[0] = part[0].chr.upcase }.join end |
#constantize(camel_cased_word) ⇒ Object
Tries to find a constant with the name specified in the argument string:
constantize(“Module”) # => Module constantize(“Test::Unit”) # => Test::Unit
The name is assumed to be the one of a top-level constant, no matter whether it starts with “::” or not. No lexical context is taken into account:
C = ‘outside’ module M
C = 'inside'
C # => 'inside'
constantize("C") # => 'outside', same as ::C
end
NameError is raised when the constant is unknown.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/resque_sqs/job.rb', line 97 def constantize(camel_cased_word) camel_cased_word = camel_cased_word.to_s if camel_cased_word.include?('-') camel_cased_word = classify(camel_cased_word) end names = camel_cased_word.split('::') names.shift if names.empty? || names.first.empty? constant = Object names.each do |name| args = Module.method(:const_get).arity != 1 ? [false] : [] if constant.const_defined?(name, *args) constant = constant.const_get(name) else constant = constant.const_missing(name) end end constant end |
#decode(object) ⇒ Object
Given a string, returns a Ruby object.
34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/resque_sqs/job.rb', line 34 def decode(object) return unless object begin if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load) MultiJson.load object else MultiJson.decode object end rescue ::MultiJson::DecodeError => e raise DecodeException, e., e.backtrace end end |
#encode(object) ⇒ Object
Given a Ruby object, returns a string suitable for storage in a queue.
25 26 27 28 29 30 31 |
# File 'lib/resque_sqs/job.rb', line 25 def encode(object) if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load) MultiJson.dump object else MultiJson.encode object end end |
#fail(exception) ⇒ Object
Given an exception object, hands off the needed parameters to the Failure module.
291 292 293 294 295 296 297 298 |
# File 'lib/resque_sqs/job.rb', line 291 def fail(exception) run_failure_hooks(exception) Failure.create \ :payload => payload, :exception => exception, :worker => worker, :queue => queue end |
#failure_hooks ⇒ Object
331 332 333 |
# File 'lib/resque_sqs/job.rb', line 331 def failure_hooks @failure_hooks ||= Plugin.failure_hooks(payload_class) end |
#has_payload_class? ⇒ Boolean
278 279 280 281 282 |
# File 'lib/resque_sqs/job.rb', line 278 def has_payload_class? payload_class != Object rescue NameError false end |
#inspect ⇒ Object
String representation
307 308 309 310 |
# File 'lib/resque_sqs/job.rb', line 307 def inspect obj = @payload "(Job{%s} | %s | %s)" % [ @queue, obj['class'], obj['args'].inspect ] end |
#payload_class ⇒ Object
Returns the actual class constant represented in this job’s payload.
267 268 269 |
# File 'lib/resque_sqs/job.rb', line 267 def payload_class @payload_class ||= constantize(@payload['class']) end |
#payload_class_name ⇒ Object
Returns the payload class as a string without raising NameError
272 273 274 275 276 |
# File 'lib/resque_sqs/job.rb', line 272 def payload_class_name payload_class.to_s rescue NameError 'No Name' end |
#perform ⇒ Object
Attempts to perform the work represented by this job instance. Calls #perform on the class given in the payload with the arguments given in the payload.
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/resque_sqs/job.rb', line 209 def perform job = payload_class job_args = args || [] job_was_performed = false begin # Execute before_perform hook. Abort the job gracefully if # ResqueSqs::DontPerform is raised. begin before_hooks.each do |hook| job.send(hook, *job_args) end rescue DontPerform return false end # Execute the job. Do it in an around_perform hook if available. if around_hooks.empty? job.perform(*job_args) job_was_performed = true else # We want to nest all around_perform plugins, with the last one # finally calling perform stack = around_hooks.reverse.inject(nil) do |last_hook, hook| if last_hook lambda do job.send(hook, *job_args) { last_hook.call } end else lambda do job.send(hook, *job_args) do result = job.perform(*job_args) job_was_performed = true result end end end end stack.call end # Execute after_perform hook after_hooks.each do |hook| job.send(hook, *job_args) end # Return true if the job was performed return job_was_performed # If an exception occurs during the job execution, look for an # on_failure hook then re-raise. rescue Object => e run_failure_hooks(e) raise e end end |
#recreate ⇒ Object
Creates an identical job, essentially placing this job back on the queue.
302 303 304 |
# File 'lib/resque_sqs/job.rb', line 302 def recreate self.class.create(queue, payload_class, *args) end |
#run_failure_hooks(exception) ⇒ Object
335 336 337 338 339 340 341 342 343 344 |
# File 'lib/resque_sqs/job.rb', line 335 def run_failure_hooks(exception) begin job_args = args || [] if has_payload_class? failure_hooks.each { |hook| payload_class.send(hook, exception, *job_args) } unless @failure_hooks_ran end ensure @failure_hooks_ran = true end end |