Class: Ruote::Engine
- Inherits:
-
Object
- Object
- Ruote::Engine
- Includes:
- ReceiverMixin
- Defined in:
- lib/ruote/engine.rb
Overview
This class holds the 'engine' name, perhaps 'dashboard' would have been a better name. Anyway, the methods here allow to launch processes and to query about their status. There are also methods for fixing issues with stalled processes or processes stuck in errors.
NOTE : the methods #launch and #reply are implemented in Ruote::ReceiverMixin (this Engine class has all the methods of a Receiver).
Instance Attribute Summary (collapse)
-
- (Object) context
readonly
Returns the value of attribute context.
-
- (Object) variables
readonly
Returns the value of attribute variables.
Instance Method Summary (collapse)
-
- (Object) add_service(name, path_or_instance, classname = nil, opts = nil)
Adds a service locally (will not get propagated to other workers).
-
- (Object) cancel(wi_or_fei_or_wfid)
Given a workitem or a fei, will do a cancel_expression, else it's a wfid and it does a cancel_process.
-
- (Object) cancel_expression(fei)
Cancels a segment of process instance.
-
- (Object) cancel_process(wfid)
Given a process identifier (wfid), cancels this process.
-
- (Object) configuration(config_key)
Returns a configuration value.
-
- (Object) configure(config_key, value)
Sets a configuration option.
-
- (Object) errors(wfid = nil)
Returns an array of current errors (hashes).
-
- (Object) history
A shortcut for engine.context.history.
-
- (Engine) initialize(worker_or_storage, opts = true)
constructor
Creates an engine using either worker or storage.
-
- (Object) join
Joins the worker thread.
-
- (Object) kill(wi_or_fei_or_wfid)
Given a workitem or a fei, will do a kill_expression, else it's a wfid and it does a kill_process.
-
- (Object) kill_expression(fei)
Like #cancel_expression, but :on_cancel attributes (of the expressions) are not triggered.
-
- (Object) kill_process(wfid)
Given a process identifier (wfid), kills this process.
-
- (Object) launch_single(process_definition, fields = {}, variables = {})
Quick note : the implementation of launch is found in the module Ruote::ReceiverMixin that the engine includes.
-
- (Object) load_definition(path)
Loads (and turns into a tree) the process definition at the given path.
-
- (Object) noisy=(b)
A debug helper :.
-
- (Object) on_error
Returns the process tree that is triggered in case of error.
-
- (Object) on_error=(target)
Sets a participant or subprocess to be triggered when an error occurs in a process instance.
-
- (Object) on_terminate
Returns the process tree that is triggered in case of process termination.
-
- (Object) on_terminate=(target)
Sets a participant or a subprocess that is to be launched/called whenever a regular process terminates.
-
- (Object) participant_list
Returns a list of Ruote::ParticipantEntry instances.
-
- (Object) participant_list=(pl)
Accepts a list of Ruote::ParticipantEntry instances or a list of
- regex, [ classname, opts
-
] arrays.
-
- (Object) process(wfid)
Returns a ProcessStatus instance describing the current status of a process instance.
-
- (Object) process_wfids
Returns a [sorted] list of wfids of the process instances currently running in the engine.
-
- (Object) processes(opts = {})
Returns an array of ProcessStatus instances.
-
- (Object) ps(wfid = nil)
Returns a list of processes or the process status of a given process instance.
-
- (Object) re_apply(fei, opts = {})
Re-applies an expression (given via its FlowExpressionId).
-
- (Object) register(*args, &block)
A shorter version of #register_participant.
-
- (Object) register_participant(regex, participant = nil, opts = {}, &block)
Registers a participant in the engine.
-
- (Object) replay_at_error(err)
Replays at a given error (hopefully you fixed the cause of the error before replaying…).
-
- (Object) schedules(wfid = nil)
Returns an array of schedules.
-
- (Object) shutdown
Shuts down the engine, mostly passes the shutdown message to the other services and hope they'll shut down properly.
-
- (Object) storage
Returns the storage this engine works with passed at engine initialization.
-
- (Object) storage_participant
A convenience method for.
-
- (Object) unregister_participant(name_or_participant)
(also: #unregister)
Removes/unregisters a participant from the engine.
-
- (Object) wait_for(*items)
This method expects there to be a logger with a wait_for method in the context, else it will raise an exception.
-
- (Object) worker
Returns the worker nested inside this engine (passed at initialization).
Methods included from ReceiverMixin
#applied_workitem, #fetch_flow_expression, #launch, #receive, #reply, #reply_to_engine, #sign
Constructor Details
- (Engine) initialize(worker_or_storage, opts = true)
Creates an engine using either worker or storage.
If a storage instance is given as the first argument, the engine will be able to manage processes (for example, launch and cancel workflows) but will not actually run any workflows.
If a worker instance is given as the first argument and the second argument is true, engine will start the worker and will be able to both manage and run workflows.
If the second options is set to { :join => true }, the worker wil be started and run in the current thread.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/ruote/engine.rb', line 61 def initialize(worker_or_storage, opts=true) @context = worker_or_storage.context @context.engine = self @variables = EngineVariables.new(@context.storage) if @context.worker if opts == true @context.worker.run_in_thread # runs worker in its own thread elsif opts == { :join => true } @context.worker.run # runs worker in current thread (and doesn't return) #else # worker is not run end #else # no worker end end |
Instance Attribute Details
- (Object) context (readonly)
Returns the value of attribute context
45 46 47 |
# File 'lib/ruote/engine.rb', line 45 def context @context end |
- (Object) variables (readonly)
Returns the value of attribute variables
46 47 48 |
# File 'lib/ruote/engine.rb', line 46 def variables @variables end |
Instance Method Details
- (Object) add_service(name, path_or_instance, classname = nil, opts = nil)
Adds a service locally (will not get propagated to other workers).
tracer = Tracer.new
@engine.add_service('tracer', tracer)
or
@engine.add_service('tracer', 'ruote/exp/tracer', 'Ruote::Exp::Tracer')
This method returns the service instance it just bound.
646 647 648 649 |
# File 'lib/ruote/engine.rb', line 646 def add_service(name, path_or_instance, classname=nil, opts=nil) @context.add_service(name, path_or_instance, classname, opts) end |
- (Object) cancel(wi_or_fei_or_wfid)
Given a workitem or a fei, will do a cancel_expression, else it's a wfid and it does a cancel_process.
200 201 202 203 204 205 206 207 208 209 |
# File 'lib/ruote/engine.rb', line 200 def cancel(wi_or_fei_or_wfid) target = Ruote.extract_id(wi_or_fei_or_wfid) if target.is_a?(String) cancel_process(target) else cancel_expression(target) end end |
- (Object) cancel_expression(fei)
Cancels a segment of process instance. Since expressions are nodes in processes instances, cancelling an expression, will cancel the expression and all its children (the segment of process).
182 183 184 185 186 |
# File 'lib/ruote/engine.rb', line 182 def cancel_expression(fei) fei = fei.to_h if fei.respond_to?(:to_h) @context.storage.put_msg('cancel', 'fei' => fei) end |
- (Object) cancel_process(wfid)
Given a process identifier (wfid), cancels this process.
164 165 166 167 |
# File 'lib/ruote/engine.rb', line 164 def cancel_process(wfid) @context.storage.put_msg('cancel_process', 'wfid' => wfid) end |
- (Object) configuration(config_key)
Returns a configuration value.
engine.configure('ruby_eval_allowed', true)
p engine.configuration('ruby_eval_allowed')
# => true
672 673 674 675 |
# File 'lib/ruote/engine.rb', line 672 def configuration(config_key) @context[config_key] end |
- (Object) configure(config_key, value)
Sets a configuration option. Examples:
# allow remote workflow definitions (for subprocesses or when launching
# processes)
@engine.configure('remote_definition_allowed', true)
# allow ruby_eval
@engine.configure('ruby_eval_allowed', true)
660 661 662 663 |
# File 'lib/ruote/engine.rb', line 660 def configure(config_key, value) @context[config_key] = value end |
- (Object) errors(wfid = nil)
Returns an array of current errors (hashes)
Can be called in two ways :
engine.errors(wfid)
and
engine.errors(:skip => 100, :limit => 100)
325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'lib/ruote/engine.rb', line 325 def errors(wfid=nil) wfid, = wfid.is_a?(Hash) ? [ nil, wfid ] : [ wfid, {} ] errs = wfid.nil? ? @context.storage.get_many('errors', nil, ) : @context.storage.get_many('errors', wfid) return errs if [:count] errs.collect { |err| ProcessError.new(err) } end |
- (Object) history
A shortcut for engine.context.history
102 103 104 105 |
# File 'lib/ruote/engine.rb', line 102 def history @context.history end |
- (Object) join
Joins the worker thread. If this engine has no nested worker, calling this method will simply return immediately.
429 430 431 432 |
# File 'lib/ruote/engine.rb', line 429 def join worker.join if worker end |
- (Object) kill(wi_or_fei_or_wfid)
Given a workitem or a fei, will do a kill_expression, else it's a wfid and it does a kill_process.
214 215 216 217 218 219 220 221 222 223 |
# File 'lib/ruote/engine.rb', line 214 def kill(wi_or_fei_or_wfid) target = Ruote.extract_id(wi_or_fei_or_wfid) if target.is_a?(String) kill_process(target) else kill_expression(target) end end |
- (Object) kill_expression(fei)
Like #cancel_expression, but :on_cancel attributes (of the expressions) are not triggered.
191 192 193 194 195 |
# File 'lib/ruote/engine.rb', line 191 def kill_expression(fei) fei = fei.to_h if fei.respond_to?(:to_h) @context.storage.put_msg('cancel', 'fei' => fei, 'flavour' => 'kill') end |
- (Object) kill_process(wfid)
Given a process identifier (wfid), kills this process. Killing is equivalent to cancelling, but when killing, :on_cancel attributes are not triggered.
173 174 175 176 |
# File 'lib/ruote/engine.rb', line 173 def kill_process(wfid) @context.storage.put_msg('kill_process', 'wfid' => wfid) end |
- (Object) launch_single(process_definition, fields = {}, variables = {})
Quick note : the implementation of launch is found in the module Ruote::ReceiverMixin that the engine includes.
Some processes have to have one and only one instance of themselves running, these are called 'singles' ('singleton' is too object-oriented).
When called, this method will check if an instance of the pdef is already running (it uses the process definition name attribute), if yes, it will return without having launched anything. If there is no such process running, it will launch it (and register it).
Returns the wfid (workflow instance id) of the running single.
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/ruote/engine.rb', line 120 def launch_single(process_definition, fields={}, variables={}) tree = @context.reader.read(process_definition) name = tree[1]['name'] || (tree[1].find { |k, v| v.nil? } || []).first raise ArgumentError.new( 'process definition is missing a name, cannot launch as single' ) unless name singles = @context.storage.get('variables', 'singles') || { '_id' => 'singles', 'type' => 'variables', 'h' => {} } wfid, = singles['h'][name] return wfid if wfid && (ps(wfid) || Time.now.to_f - < 1.0) # return wfid if 'singleton' process is already running wfid = @context.wfidgen.generate singles['h'][name] = [ wfid, Time.now.to_f ] r = @context.storage.put(singles) return launch_single(tree, fields, variables) unless r.nil? # # the put failed, back to the start... # # all this to prevent races between multiple engines, # multiple launch_single calls (from different Ruby runtimes) # ... green for launch @context.storage.put_msg( 'launch', 'wfid' => wfid, 'tree' => tree, 'workitem' => { 'fields' => fields }, 'variables' => variables) wfid end |
- (Object) load_definition(path)
Loads (and turns into a tree) the process definition at the given path.
436 437 438 439 |
# File 'lib/ruote/engine.rb', line 436 def load_definition(path) @context.reader.read(path) end |
- (Object) noisy=(b)
A debug helper :
engine.noisy = true
will let the engine (in fact the worker) pour all the details of the executing process instances to STDOUT.
774 775 776 777 |
# File 'lib/ruote/engine.rb', line 774 def noisy=(b) @context.logger.noisy = b end |
- (Object) on_error
Returns the process tree that is triggered in case of error.
Note that this 'on_error' doesn't trigger if an on_error is defined in the process itself.
Returns nil if there is no 'on_error' set.
684 685 686 687 688 689 690 |
# File 'lib/ruote/engine.rb', line 684 def on_error @context.storage.get_trackers['trackers']['on_error']['msg']['tree'] rescue nil end |
- (Object) on_error=(target)
Sets a participant or subprocess to be triggered when an error occurs in a process instance.
engine.on_error = participant_name
engine.on_error = subprocess_name
engine.on_error = Ruote.process_definition do
alpha
end
Note that this 'on_error' doesn't trigger if an on_error is defined in the process itself.
721 722 723 724 725 726 727 728 729 730 731 732 733 734 |
# File 'lib/ruote/engine.rb', line 721 def on_error=(target) @context.tracker.add_tracker( nil, # do not track a specific wfid 'error_intercepted', # react on 'error_intercepted' msgs 'on_error', # the identifier nil, # no specific condition { 'action' => 'launch', 'wfid' => 'replace', 'tree' => target.is_a?(String) ? [ 'define', {}, [ [ target, {}, [] ] ] ] : target, 'workitem' => 'replace', 'variables' => 'compile' }) end |
- (Object) on_terminate
Returns the process tree that is triggered in case of process termination.
Note that a termination process doesn't raise a termination process when it terminates itself.
Returns nil if there is no 'on_terminate' set.
699 700 701 702 703 704 705 |
# File 'lib/ruote/engine.rb', line 699 def on_terminate @context.storage.get_trackers['trackers']['on_terminate']['msg']['tree'] rescue nil end |
- (Object) on_terminate=(target)
Sets a participant or a subprocess that is to be launched/called whenever a regular process terminates.
engine.on_terminate = participant_name
engine.on_terminate = subprocess_name
engine.on_terminate = Ruote.define do
alpha
bravo
end
Note that a termination process doesn't raise a termination process when it terminates itself.
on_terminate processes are not triggered for on_error processes. on_error processes are triggered for on_terminate processes as well.
754 755 756 757 758 759 760 761 762 763 764 765 |
# File 'lib/ruote/engine.rb', line 754 def on_terminate=(target) @context.tracker.add_tracker( nil, # do not track a specific wfid 'terminated', # react on 'error_intercepted' msgs 'on_terminate', # the identifier nil, # no specific condition { 'action' => 'launch', 'tree' => target.is_a?(String) ? [ 'define', {}, [ [ target, {}, [] ] ] ] : target, 'workitem' => 'replace' }) end |
- (Object) participant_list
Returns a list of Ruote::ParticipantEntry instances.
engine.register_participant :alpha, MyParticipant, 'message' => 'hello'
# interrogate participant list
#
list = engine.participant_list
participant = list.first
p participant.regex
# => "^alpha$"
p participant.classname
# => "MyParticipant"
p participant.
# => {"message"=>"hello"}
# update participant list
#
participant.regex = '^alfred$'
engine.participant_list = list
597 598 599 600 |
# File 'lib/ruote/engine.rb', line 597 def participant_list @context.plist.list end |
- (Object) participant_list=(pl)
Accepts a list of Ruote::ParticipantEntry instances or a list of
- regex, [ classname, opts
-
] arrays.
See Engine#participant_list
Some examples :
engine.participant_list = [
[ '^charly$', [ 'Ruote::StorageParticipant', {} ] ],
[ '.+', [ 'MyDefaultParticipant', { 'default' => true } ]
]
This method writes the participant list in one go, it might be easier to use than to register participant one by ones.
617 618 619 620 |
# File 'lib/ruote/engine.rb', line 617 def participant_list=(pl) @context.plist.list = pl end |
- (Object) process(wfid)
Returns a ProcessStatus instance describing the current status of a process instance.
277 278 279 280 |
# File 'lib/ruote/engine.rb', line 277 def process(wfid) list_processes([ wfid ], {}).first end |
- (Object) process_wfids
Returns a [sorted] list of wfids of the process instances currently running in the engine.
This operation is substantially less costly than Engine#processes (though the 'how substantially' depends on the storage chosen).
370 371 372 373 374 375 |
# File 'lib/ruote/engine.rb', line 370 def process_wfids @context.storage.ids('expressions').collect { |sfei| sfei.split('!').last }.uniq.sort end |
- (Object) processes(opts = {})
Returns an array of ProcessStatus instances.
WARNING : this is an expensive operation, but it understands :skip and :limit, so pagination is our friend.
Please note, if you're interested only in processes that have errors, Engine#errors is a more efficient means.
To simply list the wfids of the currently running, Engine#process_wfids is way cheaper to call.
293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/ruote/engine.rb', line 293 def processes(opts={}) wfids = nil if opts.size > 0 wfids = @context.storage.expression_wfids(opts) return wfids.size if opts[:count] end list_processes(wfids, opts) end |
- (Object) ps(wfid = nil)
Returns a list of processes or the process status of a given process instance.
310 311 312 313 |
# File 'lib/ruote/engine.rb', line 310 def ps(wfid=nil) wfid == nil ? processes : process(wfid) end |
- (Object) re_apply(fei, opts = {})
Re-applies an expression (given via its FlowExpressionId).
That will cancel the expression and, once the cancel operation is over (all the children have been cancelled), the expression will get re-applied.
options
:tree is used to completely change the tree of the expression at re_apply
engine.re_apply(fei, :tree => [ 'participant', { 'ref' => 'bob' }, [] ])
:fields is used to replace the fields of the workitem at re_apply
engine.re_apply(fei, :fields => { 'customer' => 'bob' })
:merge_in_fields is used to add / override fields
engine.re_apply(fei, :merge_in_fields => { 'customer' => 'bob' })
269 270 271 272 |
# File 'lib/ruote/engine.rb', line 269 def re_apply(fei, opts={}) @context.storage.put_msg('cancel', 'fei' => fei.to_h, 're_apply' => opts) end |
- (Object) register(*args, &block)
A shorter version of #register_participant
engine.register 'alice', MailParticipant, :target => 'alice@example.com'
or a block registering mechanism.
engine.register do
alpha 'Participants::Alpha', 'flavour' => 'vanilla'
participant 'bravo', 'Participants::Bravo', :flavour => 'peach'
catchall ParticipantCharlie, 'flavour' => 'coconut'
end
Originally implemented in ruote-kit by Torsten Schoenebaum.
552 553 554 555 556 557 558 559 560 |
# File 'lib/ruote/engine.rb', line 552 def register(*args, &block) if args.size > 0 register_participant(*args, &block) else proxy = ParticipantRegistrationProxy.new(self) block.arity < 1 ? proxy.instance_eval(&block) : block.call(proxy) end end |
- (Object) register_participant(regex, participant = nil, opts = {}, &block)
Registers a participant in the engine. Returns the participant instance.
Some examples :
require 'ruote/part/hash_participant'
alice = engine.register_participant 'alice', Ruote::HashParticipant
# register an in-memory (hash) store for Alice's workitems
engine.register_participant 'compute_sum' do |wi|
wi.fields['sum'] = wi.fields['articles'].inject(0) do |s, (c, v)|
s + c * v # sum + count * value
end
# a block participant implicitely replies to the engine immediately
end
class MyParticipant
def initialize(name)
@name = name
end
def consume(workitem)
workitem.fields['rocket_name'] = @name
send_to_the_moon(workitem)
end
def cancel(fei, flavour)
# do nothing
end
end
engine.register_participant /^moon-.+/, MyParticipant.new('Saturn-V')
'stateless' participants are preferred over 'stateful' ones
Ruote 2.1 is OK with 1 storage and 1+ workers. The workers may be in other ruby runtimes. This implies that if you have registered a participant instance (instead of passing its classname and options), that participant will only run in the worker 'embedded' in the engine where it was registered… Let me rephrase it, participants instantiated at registration time (and that includes block participants) only runs in one worker, always the same.
'stateless' participants, instantiated at each dispatch, are preferred. Any worker can handle them.
Block participants are still fine for demos (where the worker is included in the engine (see all the quickstarts). And small engines with 1 worker are not that bad, not everybody is building huge systems).
Here is a 'stateless' participant example :
class MyStatelessParticipant
def initialize(opts)
@opts = opts
end
def consume(workitem)
workitem.fields['rocket_name'] = @opts['name']
send_to_the_moon(workitem)
end
def cancel(fei, flavour)
# do nothing
end
end
engine.register_participant(
'moon', MyStatelessParticipant, 'name' => 'saturn5')
Remember that the options (the hash that follows the class name), must be serialisable via JSON.
require_path and load_path
It's OK to register a participant by passing its full classname as a String.
engine.register_participant(
'auditor', 'AuditParticipant', 'require_path' => 'part/audit.rb')
engine.register_participant(
'auto_decision', 'DecParticipant', 'load_path' => 'part/dec.rb')
Note the option load_path / require_path that point to the ruby file containing the participant implementation. 'require' will load and eval the ruby code only once, 'load' each time.
524 525 526 527 528 529 530 531 532 533 534 535 536 |
# File 'lib/ruote/engine.rb', line 524 def register_participant(regex, participant=nil, opts={}, &block) if participant.is_a?(Hash) opts = participant participant = nil end pa = @context.plist.register(regex, participant, opts, block) @context.storage.put_msg('participant_registered', 'regex' => regex.to_s) pa end |
- (Object) replay_at_error(err)
Replays at a given error (hopefully you fixed the cause of the error before replaying…)
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/ruote/engine.rb', line 228 def replay_at_error(err) msg = err.msg.dup action = msg.delete('action') msg['replay_at_error'] = true # just an indication if msg['tree'] && fei = msg['fei'] # # nukes the expression in case of [re]apply # exp = Ruote::Exp::FlowExpression.fetch(@context, fei) exp.unpersist_or_raise if exp end @context.storage.delete(err.to_h) # remove error @context.storage.put_msg(action, msg) # trigger replay end |
- (Object) schedules(wfid = nil)
Returns an array of schedules. Those schedules are open structs with various properties, like target, owner, at, put_at, …
Introduced mostly for ruote-kit.
Can be called in two ways :
engine.schedules(wfid)
and
engine.schedules(:skip => 100, :limit => 100)
351 352 353 354 355 356 357 358 359 360 361 362 |
# File 'lib/ruote/engine.rb', line 351 def schedules(wfid=nil) wfid, = wfid.is_a?(Hash) ? [ nil, wfid ] : [ wfid, {} ] scheds = wfid.nil? ? @context.storage.get_many('schedules', nil, ) : @context.storage.get_many('schedules', /!#{wfid}-\d+$/) return scheds if [:count] scheds.collect { |sched| Ruote.schedule_to_h(sched) } end |
- (Object) shutdown
Shuts down the engine, mostly passes the shutdown message to the other services and hope they'll shut down properly.
380 381 382 383 |
# File 'lib/ruote/engine.rb', line 380 def shutdown @context.shutdown end |
- (Object) storage
Returns the storage this engine works with passed at engine initialization.
86 87 88 89 |
# File 'lib/ruote/engine.rb', line 86 def storage @context.storage end |
- (Object) storage_participant
A convenience method for
sp = Ruote::StorageParticipant.new(engine)
simply do
sp = engine.storage_participant
630 631 632 633 |
# File 'lib/ruote/engine.rb', line 630 def storage_participant @storage_participant ||= Ruote::StorageParticipant.new(self) end |
- (Object) unregister_participant(name_or_participant) Also known as: unregister
Removes/unregisters a participant from the engine.
564 565 566 567 568 569 570 571 572 573 |
# File 'lib/ruote/engine.rb', line 564 def unregister_participant(name_or_participant) re = @context.plist.unregister(name_or_participant) raise(ArgumentError.new('participant not found')) unless re @context.storage.put_msg( 'participant_unregistered', 'regex' => re.to_s) end |
- (Object) wait_for(*items)
This method expects there to be a logger with a wait_for method in the context, else it will raise an exception.
WARNING : wait_for() is meant for environments where there is a unique worker and that worker is nested in this engine. In a multiple worker environment wait_for doesn't see events handled by 'other' workers.
This method is only useful for test/quickstart/examples environments.
engine.wait_for(:alpha)
# will make the current thread block until a workitem is delivered
# to the participant named 'alpha'
engine.wait_for('123432123-9043')
# will make the current thread block until the processed whose
# wfid is given (String) terminates or produces an error.
engine.wait_for(5)
# will make the current thread block until 5 messages have been
# processed on the workqueue...
engine.wait_for(:empty)
# will return as soon as the engine/storage is empty, ie as soon
# as there are no more processes running in the engine (no more
# expressions placed in the storage)
It's OK to wait for multiple wfids :
engine.wait_for('20100612-bezerijozo', '20100612-yakisoba')
415 416 417 418 419 420 421 422 423 424 |
# File 'lib/ruote/engine.rb', line 415 def wait_for(*items) logger = @context['s_logger'] raise( "can't wait_for, there is no logger that responds to that call" ) unless logger.respond_to?(:wait_for) logger.wait_for(items) end |
- (Object) worker
Returns the worker nested inside this engine (passed at initialization). Returns nil if this engine is only linked to a storage (and the worker is running somewhere else (hopefully)).
95 96 97 98 |
# File 'lib/ruote/engine.rb', line 95 def worker @context.worker end |