Class: Ruote::Exp::ParticipantExpression

Inherits:
FlowExpression show all
Defined in:
lib/ruote/exp/fe_participant.rb

Overview

The ‘participant’ expression is very special. It sits on the fence between the engine and the external world.

The participant expression is used to pass workitems to participants from the engine. Those participants are bound at start time (usually) in the engine via its register_participant method.

Here’s an example of two concurrent participant expressions in use :

concurrence do
  participant :ref => 'alice'
  participant :ref => 'bob'
end

Upon encountering the two expressions, the engine will lookup their name in the participant map and hand the workitems to the participant instances registered for those names.

attributes passed as arguments

All the attributes passed to a participant will be fed to the outgoing workitem under a new ‘params’ field.

Thus, with

participant :ref => 'alice', :task => 'maw the lawn', :timeout => '2d'

Alice will receive a workitem with a field params set to

{ :ref => 'alice', :task => 'maw the lawn', :timeout => '2d' }

The fields named ‘params’ will be deleted before the workitems resumes in the flow (with the engine replying to the parent expression of this participant expression).

simplified participant notation

This process definition is equivalent to the one above. Less to write.

concurrence do
  participant 'alice'
  bob
end

Please note that ‘bob’ alone could stand for the participant ‘bob’ or the subprocess named ‘bob’. Subprocesses do take precedence over participants (if there is a subprocess named ‘bob’ and a participant named ‘bob’.

participant defined timeout

Usually, timeouts are given for an expression in the process definition.

participant 'alice', :timeout => '2d'

where alice as two days to complete her task (send back the workitem).

But it’s OK for participant classes registered in the engine to provide their own timeout value. The participant instance simply has to reply to the #rtimeout method and provide a meaningful timeout value (like a number of seconds, or a string like “2d” or “1M2w”.

Note however, that the process definition timeout (if any) will take precedence over the participant specified one.

asynchronous

The expression will make sure to dispatch to the participant in an asynchronous way. This means that the dispatch will occur in a dedicated thread.

Since the dispatching to the participant could take a long time and block the engine for too long, this ‘do thread’ policy is used by default.

If the participant itself replies to the method #do_not_thread and replies positively to it, a new thread (or a next_tick) won’t get used. This is practical for tiny participants that don’t do IO and reply immediately (after a few operations). By default, BlockParticipant instances do not thread.

Constant Summary

Constants inherited from FlowExpression

FlowExpression::COMMON_ATT_KEYS

Instance Attribute Summary

Attributes inherited from FlowExpression

#context, #error, #h

Instance Method Summary collapse

Methods inherited from FlowExpression

#ancestor?, #att, #attribute, #attribute_text, #attributes, #compile_atts, #compile_variables, do_action, #do_apply, #do_cancel, #do_fail, #do_persist, #do_reply, #do_unpersist, #expand_atts, #fei, fetch, from_h, #handle_on_error, #has_attribute, #initial_persist, #initialize, #iterative_var_lookup, #launch_sub, #lookup_on_error, #lookup_val, #lookup_val_prefix, #lookup_variable, #name, names, #parent, #parent_id, #persist_or_raise, #set_variable, #to_h, #tree, #tree_children, #try_persist, #try_unpersist, #unpersist_or_raise, #unset_variable, #update_tree, #variables

Methods included from WithMeta

#class_def, included

Methods included from WithH

included

Constructor Details

This class inherits a constructor from Ruote::Exp::FlowExpression

Instance Method Details

#applyObject

Raises:

  • (ArgumentError)


126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/ruote/exp/fe_participant.rb', line 126

def apply

  #
  # determine participant

  h.participant_name = (attribute(:ref) || attribute_text).to_s

  raise ArgumentError.new(
    "no participant name specified"
  ) if h.participant_name == ''

  h.participant ||=
    @context.plist.lookup_info(h.participant_name, h.applied_workitem)

  raise(ArgumentError.new(
    "no participant named #{h.participant_name.inspect}")
  ) if h.participant.nil?

  #
  # dispatch to participant

  h.applied_workitem['participant_name'] = h.participant_name

  h.applied_workitem['fields']['params'] = compile_atts
  h.applied_workitem['fields'].delete('t')

  schedule_timeout(h.participant)

  persist_or_raise

  @context.storage.put_msg(
    'dispatch',
    'fei' => h.fei,
    'participant_name' => h.participant_name,
    'participant' => h.participant,
    'workitem' => h.applied_workitem)
end

#cancel(flavour) ⇒ Object



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/ruote/exp/fe_participant.rb', line 164

def cancel(flavour)

  return reply_to_parent(h.applied_workitem) unless h.participant_name
    # no participant, reply immediately

  do_persist || return
    #
    # if do_persist returns false, it means we're operating on stale
    # data and cannot continue

  @context.storage.put_msg(
    'dispatch_cancel',
    'fei' => h.fei,
    'participant_name' => h.participant_name,
    'participant' => h.participant,
    'flavour' => flavour,
    'workitem' => h.applied_workitem)
end

#reply(workitem) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/ruote/exp/fe_participant.rb', line 183

def reply(workitem)

  pinfo =
    h.participant ||
    @context.plist.lookup_info(h.participant_name, workitem)

  pa = @context.plist.instantiate(pinfo, :if_respond_to? => :on_reply)

  pa.on_reply(Ruote::Workitem.new(workitem)) if pa

  super(workitem)
end

#reply_to_parent(workitem) ⇒ Object



196
197
198
199
200
201
# File 'lib/ruote/exp/fe_participant.rb', line 196

def reply_to_parent(workitem)

  workitem['fields'].delete('params')
  workitem['fields'].delete('dispatched_at')
  super(workitem)
end