Class: Ruote::Exp::ConcurrenceExpression

Inherits:
FlowExpression show all
Includes:
MergeMixin
Defined in:
lib/ruote/exp/fe_concurrence.rb

Overview

The ‘concurrence’ expression applies its child branches in parallel (well it makes a best effort to make them run in parallel).

concurrence do
  alpha
  bravo
end

attributes

The concurrence expression takes a number of attributes that allow for sophisticated control (especially at merge time).

:count

concurrence :count => 1 do
  alpha
  bravo
end

in that example, the concurrence will terminate as soon as 1 (count) of the branches replies. The other branch will get cancelled.

:remaining

As said for :count, the remaining branches get cancelled. By setting :remaining to :forget (or ‘forget’), the remaining branches will continue their execution, forgotten.

concurrence :count => 1, :remaining => :forget do
  alpha
  bravo
end

:merge

By default, the workitems override each others. By default, the first workitem to reply will win.

sequence do
  concurrence do
    alpha
    bravo
  end
  charly
end

In that example, if ‘alpha’ replied first, the workitem that reaches ‘charly’ once ‘bravo’ replied will have the payload as seen/modified by ‘alpha’.

The :merge attribute determines which branch wins the merge.

  • first (default)

  • last

  • highest

  • lowest

highest and lowest refer to the position in the list of branch. It’s useful to set a fixed winner.

concurrence :merge => :highest do
  alpha
  bravo
end

makes sure that alpha’s version of the workitem wins.

:merge_type

:override

By default, the merge type is set to ‘override’, which means that the ‘winning’ workitem’s payload supplants all other workitems’ payloads.

:mix

Setting :merge_type to :mix, will actually attempt to merge field by field, making sure that the field value of the winner(s) are used.

:isolate

:isolate will rearrange the resulting workitem payload so that there is a new field for each branch. The name of each field is the index of the branch from ‘0’ to …

:stack

:stack will stack the workitems coming back from the concurrence branches in an array whose order is determined by the :merge attributes. The array is placed in the ‘stack’ field of the resulting workitem. Note that the :stack merge_type also creates a ‘stack_attributes’ field and populates it with the expanded attributes of the concurrence.

Thus

sequence do
  concurrence :merge => :highest, :merge_type => :stack do
    reviewer1
    reviewer2
  end
  editor
end

will see the ‘editor’ receive a workitem whose fields look like :

{ 'stack' => [{ ... reviewer1 fields ... }, { ... reviewer2 fields ... }],
  'stack_attributes' => { 'merge'=> 'highest', 'merge_type' => 'stack' } }

This could prove useful for participant having to deal with multiple merge strategy results.

:union

Will override atomic fields, concat arrays and merge hashes…

The union of those two workitems

{ 'a' => 0, 'b' => [ 'x' ], 'c' => { 'aa' => 'bb' }
{ 'a' => 1, 'b' => [ 'y' ], 'c' => { 'cc' => 'dd' }

will be

{ 'a' => 1, 'b' => [ 'x', 'y' ], 'c' => { 'aa' => 'bb', 'cc' => 'dd' } }

:over_if (and :over_unless)

Like the :count attribute controls how many branches have to reply before a concurrence ends, the :over attribute is used to specify a condition upon which the concurrence will [prematurely] end.

concurrence :over_if => '${f:over}'
  alpha
  bravo
  charly
end

will end the concurrence as soon as one of the branches replies with a workitem whose field ‘over’ is set to true. (the remaining branches will get cancelled unless :remaining => :forget is set).

:over_unless needs no explanation.

Direct Known Subclasses

ConcurrentIteratorExpression

Constant Summary

Constants inherited from FlowExpression

FlowExpression::COMMON_ATT_KEYS

Instance Attribute Summary

Attributes inherited from FlowExpression

#context, #error, #h

Instance Method Summary collapse

Methods included from MergeMixin

#merge_workitem, #merge_workitems

Methods inherited from FlowExpression

#ancestor?, #att, #attribute, #attribute_text, #attributes, #cancel, #compile_atts, #compile_variables, do_action, #do_apply, #do_cancel, #do_fail, #do_pause, #do_persist, #do_reply, #do_resume, #do_unpersist, #expand_atts, #fei, fetch, from_h, #handle_on_error, #has_attribute, #initial_persist, #initialize, #iterative_var_lookup, #launch_sub, #lookup_on_error, #lookup_val, #lookup_val_prefix, #lookup_variable, #name, names, #parent, #parent_id, #persist_or_raise, #set_variable, #to_h, #tree, #tree_children, #try_persist, #try_unpersist, #unpersist_or_raise, #unset_variable, #update_tree, #variables

Methods included from WithMeta

#class_def, included

Methods included from WithH

included

Constructor Details

This class inherits a constructor from Ruote::Exp::FlowExpression

Instance Method Details

#applyObject



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/ruote/exp/fe_concurrence.rb', line 182

def apply

  h.ccount = attribute(:count).to_i rescue 0
  h.ccount = nil if h.ccount < 1

  h.cmerge = att(:merge, %w[ first last highest lowest ])
  h.cmerge_type = att(:merge_type, %w[ override mix isolate stack union ])
  h.remaining = att(:remaining, %w[ cancel forget ])

  h.workitems = (h.cmerge == 'first' || h.cmerge == 'last') ? [] : {}

  h.over = false

  apply_children
end

#reply(workitem) ⇒ Object



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/ruote/exp/fe_concurrence.rb', line 198

def reply(workitem)

  workitem = Ruote.fulldup(workitem)
    #
    # since workitem field merging might happen, better to work on
    # a copy of the workitem (so that history, coming afterwards,
    # doesn't see a modified version of the workitem)

  if h.cmerge == 'first' || h.cmerge == 'last'
    h.workitems << workitem
  else
    h.workitems[workitem['fei']['expid']] = workitem
  end

  over = h.over
  h.over = over || over?(workitem)

  if (not over) && h.over
    # just became 'over'

    reply_to_parent(nil)

  elsif h.children.empty?

    do_unpersist || return

    @context.storage.put_msg(
      'ceased',
      'wfid' => h.fei['wfid'], 'fei' => h.fei, 'workitem' => workitem)
  else

    do_persist
  end
end