Class: Ruote::Resque::Participant

Inherits:
Object
  • Object
show all
Includes:
LocalParticipant
Defined in:
lib/ruote/resque/participant.rb

Overview

A module to be included in your Resque jobs to be able to register them as participants.

Note that the participant should have a queue, either via the @queue variable when the class is accessible to Ruote, or via the :queue option on registration

A resque participant implementation.

Examples:

Register a participant

# This is defined on a remote Resque worker
# You do not need to extend Participant in this case
class MyAwesomeJob
  extend Ruote::Resque::Job
  @queue = :my_queue

  def self.perform(workitem)
    workitem['fields']['awesome'] = true
  end
end

# Use it like this in your Ruote process
engine.register_participant 'be_awesome', Ruote::Resque::Participant, :class => 'MyAwesomeJob', :queue => :my_queue
# Or register it va the DSL
Ruote::Resque.register(dashboard) do
  be_awesome 'MyAwesomeJob', :my_queue
end

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Participant

Called with the options on engine.register_participant

Parameters:

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :class (#to_s) — default: self.class

    the job class to enqueue when called

  • :queue (#to_s) — default: Resque.queue_from_class(class)

    the queue to enqueue the job in

  • :forget (Boolean) — default: false

    wait for the worker's reply if false



36
37
38
39
40
41
42
43
44
45
# File 'lib/ruote/resque/participant.rb', line 36

def initialize(opts = {})

  @job_klass = opts.delete('class')
  @job_queue = opts.delete('queue')
  @should_forget = opts.delete('forget') || false

  # Called here to raise eventual exceptions on initialization
  ::Resque.validate(@job_klass, @job_queue)

end

Instance Method Details

#do_not_threadtrue

Returns true because enqueing a job in Resque is sufficiently fast to happen in the main thread.

Returns:

  • (true)


83
84
85
# File 'lib/ruote/resque/participant.rb', line 83

def do_not_thread
  true
end

#encode_workitem(workitem) ⇒ Hash

Returns a representation of a workitem that is suitable for use in Resque.

Parameters:

  • workitem (Ruote::Workitem)

Returns:

  • (Hash)

    the workitem as a hash



75
76
77
78
79
# File 'lib/ruote/resque/participant.rb', line 75

def encode_workitem(workitem)

  workitem.to_h

end

#on_cancelBoolean

Called when Ruote has to cancel an active workitem for this participant. Destroys the job from the Resque queue.

Note that if the job is being processed by the worker or if the job has been processed but the reply has not, this method will do nothing.

Returns:

  • (Boolean)

    wether the job was deleted or not.



65
66
67
68
69
70
# File 'lib/ruote/resque/participant.rb', line 65

def on_cancel

  payload = encode_workitem(applied_workitem)
  ::Resque::Job.destroy(@job_queue, @job_klass, payload)

end

#on_workitemvoid

This method returns an undefined value.

Called when the participant is handed a workitem. Enqueues the job to Resque



50
51
52
53
54
55
56
57
# File 'lib/ruote/resque/participant.rb', line 50

def on_workitem

  payload = encode_workitem(workitem)
  ::Resque::Job.create(@job_queue, @job_klass, payload)

  reply if @should_forget

end