Class: Resque::Job

Inherits:
Object
  • Object
show all
Extended by:
Helpers
Includes:
Helpers
Defined in:
lib/resque/job.rb

Overview

A Resque::Job represents a unit of work. Each job lives on a single queue and has an associated payload object. The payload is a hash with two attributes: ‘class` and `args`. The `class` is the name of the Ruby class which should be used to run the job. The `args` are an array of arguments which should be passed to the Ruby class’s ‘perform` class-level method.

You can manually run a job using this code:

job = Resque::Job.reserve(:high)
klass = Resque::Job.constantize(job.payload['class'])
klass.perform(*job.payload['args'])

Constant Summary collapse

DontPerform =

Raise Resque::Job::DontPerform from a before_perform hook to abort the job.

Class.new(StandardError)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helpers

classify, constantize, decode, encode, mongo_stats, mongo_workers

Constructor Details

#initialize(queue, payload) ⇒ Job

Returns a new instance of Job.



32
33
34
35
36
# File 'lib/resque/job.rb', line 32

def initialize(queue, payload)
  @queue = queue
  @payload = payload
  
end

Instance Attribute Details

#payloadObject (readonly)

This job’s associated payload object.



30
31
32
# File 'lib/resque/job.rb', line 30

def payload
  @payload
end

#queueObject (readonly)

The name of the queue from which this job was pulled (or is to be placed)



27
28
29
# File 'lib/resque/job.rb', line 27

def queue
  @queue
end

#workerObject

The worker object which is currently processing this job.



23
24
25
# File 'lib/resque/job.rb', line 23

def worker
  @worker
end

Class Method Details

.create(queue, klass, *args) ⇒ Object

Creates a job by placing it on a queue. Expects a string queue name, a string class name, and an optional array of arguments to pass to the class’ ‘perform` method.

Raises an exception if no queue or class is given.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/resque/job.rb', line 43

def self.create(queue, klass, *args)
  if !queue
    raise NoQueueError.new("Jobs must be placed onto a queue.")
  end

  if klass.to_s.empty?
    raise NoClassError.new("Jobs must be given a class.")
  end

  item = { :class => klass.to_s, :args => args}
        
  item[:_id] = args[0][:_id] if Resque.allows_unique_jobs(klass) && args[0].is_a?(Hash) && args[0].has_key?(:_id)
  item[:unique] = true if item[:_id]
  
  #are we trying to put a non-delayed job into a delayed queue?
  if Resque.queue_allows_delayed(queue)
    if Resque.allows_delayed_jobs(klass)
      if args[0].is_a?(Hash) && args[0].has_key?(:delay_until)
        item[:delay_until] = args[0][:delay_until]
      else
        raise QueueError.new 'trying to insert delayed job without delay_until'
      end
    else
      raise QueueError.new 'trying to insert non-delayed job into delayed queue'
    end
  else
    if Resque.allows_delayed_jobs(klass)
      raise QueueError.new 'trying to insert a delayed job into a non-delayed queue'
    end
  end

  #is it a hydra job?
  heads = klass.instance_variable_get(:@hydra)
  if heads
    if item[:_id]
      queue = (queue.to_s + (item[:_id].hash % heads).to_s).to_sym 
    else
      queue = (queue.to_s + rand(heads).to_s).to_sym     
    end
  end
  
  ret = Resque.push(queue, item)
  Plugin.after_enqueue_hooks(klass).each do |hook|
    klass.send(hook, *args)
  end
  ret
end

.destroy(queue, klass, *args) ⇒ Object

Removes a job from a queue. Expects a string queue name, a string class name, and, optionally, args.

Returns the number of jobs destroyed.

If no args are provided, it will remove all jobs of the class provided.

That is, for these two jobs:

{ ‘class’ => ‘UpdateGraph’, ‘args’ => [‘defunkt’] } { ‘class’ => ‘UpdateGraph’, ‘args’ => [‘mojombo’] }

The following call will remove both:

Resque::Job.destroy(queue, 'UpdateGraph')

Whereas specifying args will only remove the 2nd job:

Resque::Job.destroy(queue, 'UpdateGraph', 'mojombo')

This method can be potentially very slow and memory intensive, depending on the size of your queue, as it loads all jobs into a Ruby array before processing.



115
116
117
118
119
120
121
122
# File 'lib/resque/job.rb', line 115

def self.destroy(queue, klass, *args)
  collection = Resque.mongo[queue]
  selector = {'class' => klass.to_s}
  selector['args'] = args unless args.empty?
  destroyed = collection.find(selector).count
  collection.remove(selector, :safe => true)
  destroyed
end

.reserve(queue) ⇒ Object

Given a string queue name, returns an instance of Resque::Job if any jobs are available. If not, returns nil.



126
127
128
129
# File 'lib/resque/job.rb', line 126

def self.reserve(queue)
  return unless payload = Resque.pop(queue)
  new(queue, payload)
end

Instance Method Details

#==(other) ⇒ Object

Equality



229
230
231
232
233
# File 'lib/resque/job.rb', line 229

def ==(other)
  queue == other.queue &&
    payload_class == other.payload_class &&
    args == other.args
end

#argsObject

Returns an array of args represented in this job’s payload.



202
203
204
# File 'lib/resque/job.rb', line 202

def args
  @payload['args']
end

#fail(exception) ⇒ Object

Given an exception object, hands off the needed parameters to the Failure module.



208
209
210
211
212
213
214
# File 'lib/resque/job.rb', line 208

def fail(exception)
  Failure.create \
    :payload   => payload,
    :exception => exception,
    :worker    => worker,
    :queue     => queue
end

#inspectObject

String representation



223
224
225
226
# File 'lib/resque/job.rb', line 223

def inspect
  obj = @payload
  "(Job{%s} | %s | %s)" % [ @queue, obj['class'], obj['args'].inspect ]
end

#payload_classObject

Returns the actual class constant represented in this job’s payload.



197
198
199
# File 'lib/resque/job.rb', line 197

def payload_class
  @payload_class ||= constantize(@payload['class'])
end

#performObject

Attempts to perform the work represented by this job instance. Calls #perform on the class given in the payload with the arguments given in the payload.



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/resque/job.rb', line 134

def perform
  job = payload_class
  job_args = args || []
  job_was_performed = false

  before_hooks  = Plugin.before_hooks(job)
  around_hooks  = Plugin.around_hooks(job)
  after_hooks   = Plugin.after_hooks(job)
  failure_hooks = Plugin.failure_hooks(job)

  begin
    # Execute before_perform hook. Abort the job gracefully if
    # Resque::DontPerform is raised.
    begin
      before_hooks.each do |hook|
        job.send(hook, *job_args)
      end
    rescue DontPerform
      return false
    end

    # Execute the job. Do it in an around_perform hook if available.
    if around_hooks.empty?
      job.perform(*job_args)
      job_was_performed = true
    else
      # We want to nest all around_perform plugins, with the last one
      # finally calling perform
      stack = around_hooks.reverse.inject(nil) do |last_hook, hook|
        if last_hook
          lambda do
            job.send(hook, *job_args) { last_hook.call }
          end
        else
          lambda do
            job.send(hook, *job_args) do
              result = job.perform(*job_args)
              job_was_performed = true
              result
            end
          end
        end
      end
      stack.call
    end

    # Execute after_perform hook
    after_hooks.each do |hook|
      job.send(hook, *job_args)
    end

    # Return true if the job was performed
    return job_was_performed

  # If an exception occurs during the job execution, look for an
  # on_failure hook then re-raise.
  rescue Object => e
    failure_hooks.each { |hook| job.send(hook, e, *job_args) }
    raise e
  end
end

#recreateObject

Creates an identical job, essentially placing this job back on the queue.



218
219
220
# File 'lib/resque/job.rb', line 218

def recreate
  self.class.create(queue, payload_class, *args)
end