Module: Ruote::LocalParticipant
- Includes:
- ReceiverMixin
- Included in:
- BlockParticipant, CodeParticipant, EngineParticipant, NoOpParticipant, NullParticipant, RevParticipant, SmtpParticipant, StorageParticipant
- Defined in:
- lib/ruote/part/local_participant.rb
Overview
Provides methods for ‘local’ participants.
Assumes the class that includes this module has a #context method that points to the worker or engine ruote context.
It’s “local” because it has access to the ruote storage.
Instance Attribute Summary collapse
-
#context ⇒ Object
the reply_to_engine method is there.
Instance Method Summary collapse
-
#re_dispatch(workitem, opts = {}) ⇒ Object
(also: #reject)
Use this method to re_dispatch the workitem.
-
#unschedule_re_dispatch(fei) ⇒ Object
Cancels the scheduled re_dispatch, if any.
Methods included from ReceiverMixin
#applied_workitem, #fetch_flow_expression, #launch, #receive, #reply, #reply_to_engine, #sign
Instance Attribute Details
#context ⇒ Object
the reply_to_engine method is there
43 44 45 |
# File 'lib/ruote/part/local_participant.rb', line 43 def context @context end |
Instance Method Details
#re_dispatch(workitem, opts = {}) ⇒ Object Also known as: reject
Use this method to re_dispatch the workitem.
It takes two options :in and :at for “later re_dispatch”.
Look at the unschedule_re_dispatch method for an example of participant implementation that uses re_dispatch.
Without one of those options, the method is a “reject”.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/ruote/part/local_participant.rb', line 54 def re_dispatch(workitem, opts={}) msg = { 'action' => 'dispatch', 'fei' => workitem.h.fei, 'workitem' => workitem.h, 'participant_name' => workitem.participant_name, 'rejected' => true } if t = opts[:in] || opts[:at] sched_id = @context.storage.put_schedule('at', workitem.h.fei, t, msg) fexp = fetch_flow_expression(workitem) fexp.h['re_dispatch_sched_id'] = sched_id fexp.try_persist else @context.storage.put_msg('dispatch', msg) end end |
#unschedule_re_dispatch(fei) ⇒ Object
Cancels the scheduled re_dispatch, if any.
An example or ‘retrying participant’ :
class RetryParticipant
include Ruote::LocalParticipant
def initialize(opts)
@opts = opts
end
def consume(workitem)
begin
do_the_job
reply(workitem)
rescue
re_dispatch(workitem, :in => @opts['delay'] || '1s')
end
end
def cancel(fei, flavour)
unschedule_re_dispatch(fei)
end
end
Note how unschedule_re_dispatch is used in the cancel method. Warning, this example could loop forever…
106 107 108 109 110 111 112 113 114 |
# File 'lib/ruote/part/local_participant.rb', line 106 def unschedule_re_dispatch(fei) fexp = Ruote::Exp::FlowExpression.fetch( @context, Ruote::FlowExpressionId.extract_h(fei)) if s = fexp.h['re_dispatch_sched_id'] @context.storage.delete_schedule(s) end end |