Class: Ruote::EngineParticipant
- Inherits:
-
Object
- Object
- Ruote::EngineParticipant
- Includes:
- LocalParticipant
- Defined in:
- lib/ruote/part/engine_participant.rb
Overview
A participant for pushing the execution of [segments of] processes to other engines.
It works by giving the participant the connection information to the storage of the other engine.
For instance :
engine0 =
Ruote::Engine.new(
Ruote::Worker.new(
Ruote::FsStorage.new('work0', 'engine_id' => 'engine0')))
engine1 =
Ruote::Engine.new(
Ruote::Worker.new(
Ruote::FsStorage.new('work1', 'engine_id' => 'engine1')))
engine0.register_participant('engine1',
Ruote::EngineParticipant,
'storage_class' => Ruote::FsStorage,
'storage_path' => 'ruote/storage/fs_storage',
'storage_args' => 'work1')
engine1.register_participant('engine0',
Ruote::EngineParticipant,
'storage_class' => Ruote::FsStorage,
'storage_path' => 'ruote/storage/fs_storage',
'storage_args' => 'work0')
In this example, two engines are created (note that their ‘engine_id’ is explicitely set (else it would default to ‘engine’)). Each engine is then registered as participant in the other engine. The registration parameters detail the class and the arguments to the storage of the target engine.
This example is a bit dry / flat. A real world example would perhaps detail a ‘master’ engine connected to ‘departmental’ engines, something more hierarchical.
The example also binds reciprocally engines. If the delegated processes are always ‘forgotten’, one could imagine not binding the source engine as a participant in the target engine (not need to answer back).
There are then two variants for calling a subprocess
subprocess :ref => 'subprocess_name', :engine => 'engine1'
# or
participant :ref => 'engine1', :pdef => 'subprocess_name'
It’s OK to go for the shorter versions :
subprocess_name :engine => 'engine1'
# or
participant 'engine1', :pdef => 'subprocess_name'
engine1 :pdef => 'subprocess_name'
The subprocess is defined in the current process, or it’s given via its URL. The third variant is a subprocess bound as an engine variable.
engine.variables['variant_3'] = Ruote.process_definition do
participant 'hello_world_3'
end
pdef = Ruote.process_definition do
sequence do
engine1 :pdef => 'variant_1'
engine1 :pdef => 'http://pdefs.example.com/variant_2.rb'
engine1 :pdef => 'variant_3'
end
define 'variant_1' do
participant 'hello_world_1'
end
end
Instance Attribute Summary
Attributes included from LocalParticipant
Instance Method Summary collapse
- #cancel(fei, flavour) ⇒ Object
- #consume(workitem) ⇒ Object
-
#initialize(opts) ⇒ EngineParticipant
constructor
A new instance of EngineParticipant.
- #reply(fei, workitem) ⇒ Object
Methods included from LocalParticipant
#re_dispatch, #unschedule_re_dispatch
Methods included from ReceiverMixin
#applied_workitem, #fetch_flow_expression, #launch, #receive, #reply_to_engine, #sign
Constructor Details
#initialize(opts) ⇒ EngineParticipant
Returns a new instance of EngineParticipant.
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/ruote/part/engine_participant.rb', line 108 def initialize(opts) if pa = opts['storage_path'] require pa end kl = opts['storage_class'] raise(ArgumentError.new("missing 'storage_class' parameter")) unless kl args = opts['storage_args'] args = args.is_a?(Hash) ? [ args ] : Array(args) args << {} unless args.last.is_a?(Hash) args.last['preserve_configuration'] = true @storage = Ruote.constantize(kl).new(*args) end |
Instance Method Details
#cancel(fei, flavour) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/ruote/part/engine_participant.rb', line 149 def cancel(fei, flavour) exps = @storage.get_many('expressions', /^0![^!]+!#{fei.wfid}$/) return true if exps.size < 1 # participant expression will reply to its parent @storage.put_msg( 'cancel', 'fei' => exps.first['fei'], 'flavour' => flavour) false # participant expression will NOT reply to its parent end |
#consume(workitem) ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/ruote/part/engine_participant.rb', line 126 def consume(workitem) wi = workitem.to_h fexp = Ruote::Exp::FlowExpression.fetch(@context, wi['fei']) params = wi['fields'].delete('params') forget = (fexp.attribute(:forget).to_s == 'true') @storage.put_msg( 'launch', 'wfid' => wi['fei']['wfid'], 'parent_id' => forget ? nil : wi['fei'], 'tree' => determine_tree(fexp, params), 'workitem' => wi, 'variables' => fexp.compile_variables) fexp.unpersist if forget # # special behaviour here in case of :forget => true : # parent_id of remote expression is set to nil and local expression # is unpersisted immediately end |
#reply(fei, workitem) ⇒ Object
165 166 167 168 169 170 171 |
# File 'lib/ruote/part/engine_participant.rb', line 165 def reply(fei, workitem) @storage.put_msg( 'reply', 'fei' => fei, 'workitem' => workitem) end |