Module: Ruote::LocalParticipant

Overview

Provides methods for ‘local’ participants.

Assumes the class that includes this module has a #context method that points to the worker or engine ruote context.

It’s “local” because it has access to the ruote storage.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ReceiverMixin

#applied_workitem, #fetch_flow_expression, #launch, #receive, #reply, #reply_to_engine, #sign

Instance Attribute Details

#contextObject

the reply_to_engine method is there



43
44
45
# File 'lib/ruote/part/local_participant.rb', line 43

def context
  @context
end

Instance Method Details

#re_dispatch(workitem, opts = {}) ⇒ Object Also known as: reject

Use this method to re_dispatch the workitem.

It takes two options :in and :at for “later re_dispatch”.

Look at the unschedule_re_dispatch method for an example of participant implementation that uses re_dispatch.

Without one of those options, the method is a “reject”.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/ruote/part/local_participant.rb', line 54

def re_dispatch(workitem, opts={})

  msg = {
    'action' => 'dispatch',
    'fei' => workitem.h.fei,
    'workitem' => workitem.h,
    'participant_name' => workitem.participant_name,
    'rejected' => true
  }

  if t = opts[:in] || opts[:at]

    sched_id = @context.storage.put_schedule('at', workitem.h.fei, t, msg)

    fexp = fetch_flow_expression(workitem)
    fexp.h['re_dispatch_sched_id'] = sched_id
    fexp.try_persist

  else

    @context.storage.put_msg('dispatch', msg)
  end
end

#unschedule_re_dispatch(fei) ⇒ Object

Cancels the scheduled re_dispatch, if any.

An example or ‘retrying participant’ :

class RetryParticipant
  include Ruote::LocalParticipant

  def initialize(opts)
    @opts = opts
  end

  def consume(workitem)
    begin
      do_the_job
      reply(workitem)
    rescue
      re_dispatch(workitem, :in => @opts['delay'] || '1s')
    end
  end

  def cancel(fei, flavour)
    unschedule_re_dispatch(fei)
  end
end

Note how unschedule_re_dispatch is used in the cancel method. Warning, this example could loop forever…



106
107
108
109
110
111
112
113
114
# File 'lib/ruote/part/local_participant.rb', line 106

def unschedule_re_dispatch(fei)

  fexp = Ruote::Exp::FlowExpression.fetch(
    @context, Ruote::FlowExpressionId.extract_h(fei))

  if s = fexp.h['re_dispatch_sched_id']
    @context.storage.delete_schedule(s)
  end
end