Class: Ruote::Engine
- Inherits:
-
Object
- Object
- Ruote::Engine
- Includes:
- ReceiverMixin
- Defined in:
- lib/ruote/engine.rb
Overview
This class holds the ‘engine’ name, perhaps ‘dashboard’ would have been a better name. Anyway, the methods here allow to launch processes and to query about their status. There are also methods for fixing issues with stalled processes or processes stuck in errors.
NOTE : the methods #launch and #reply are implemented in Ruote::ReceiverMixin (this Engine class has all the methods of a Receiver).
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#variables ⇒ Object
readonly
Returns the value of attribute variables.
Instance Method Summary collapse
-
#add_service(name, path_or_instance, classname = nil, opts = nil) ⇒ Object
Adds a service locally (will not get propagated to other workers).
-
#cancel(wi_or_fei_or_wfid) ⇒ Object
(also: #cancel_process, #cancel_expression)
Given a workitem or a fei, will do a cancel_expression, else it’s a wfid and it does a cancel_process.
-
#configuration(config_key) ⇒ Object
Returns a configuration value.
-
#configure(config_key, value) ⇒ Object
Sets a configuration option.
-
#error(wi_or_fei) ⇒ Object
Given a workitem or a fei (or a String version of a fei), returns the corresponding error (or nil if there is no other).
-
#errors(wfid = nil) ⇒ Object
Returns an array of current errors (hashes).
-
#history ⇒ Object
A shortcut for engine.context.history.
-
#initialize(worker_or_storage, opts = true) ⇒ Engine
constructor
Creates an engine using either worker or storage.
-
#join ⇒ Object
Joins the worker thread.
-
#kill(wi_or_fei_or_wfid) ⇒ Object
(also: #kill_process, #kill_expression)
Given a workitem or a fei, will do a kill_expression, else it’s a wfid and it does a kill_process.
-
#launch_single(process_definition, fields = {}, variables = {}) ⇒ Object
Quick note : the implementation of launch is found in the module Ruote::ReceiverMixin that the engine includes.
-
#leftovers ⇒ Object
Warning : expensive operation.
-
#load_definition(path) ⇒ Object
Loads (and turns into a tree) the process definition at the given path.
-
#noisy=(b) ⇒ Object
A debug helper :.
-
#on_error ⇒ Object
Returns the process tree that is triggered in case of error.
-
#on_error=(target) ⇒ Object
Sets a participant or subprocess to be triggered when an error occurs in a process instance.
-
#on_terminate ⇒ Object
Returns the process tree that is triggered in case of process termination.
-
#on_terminate=(target) ⇒ Object
Sets a participant or a subprocess that is to be launched/called whenever a regular process terminates.
-
#participant(name) ⇒ Object
Returns an instance of the participant registered under the given name.
-
#participant_list ⇒ Object
Returns a list of Ruote::ParticipantEntry instances.
-
#participant_list=(pl) ⇒ Object
Accepts a list of Ruote::ParticipantEntry instances or a list of [ regex, [ classname, opts ] ] arrays.
-
#pause(wi_or_fei_or_wfid, opts = {}) ⇒ Object
Given a wfid, will [attempt to] pause the corresponding process instance.
-
#process(wfid) ⇒ Object
Returns a ProcessStatus instance describing the current status of a process instance.
-
#process_ids ⇒ Object
(also: #process_wfids)
Returns a [sorted] list of wfids of the process instances currently running in the engine.
-
#processes(opts = {}) ⇒ Object
Returns an array of ProcessStatus instances.
-
#ps(wfid = nil) ⇒ Object
Returns a list of processes or the process status of a given process instance.
-
#re_apply(fei, opts = {}) ⇒ Object
Re-applies an expression (given via its FlowExpressionId).
-
#register(*args, &block) ⇒ Object
A shorter version of #register_participant.
-
#register_participant(regex, participant = nil, opts = {}, &block) ⇒ Object
Registers a participant in the engine.
-
#replay_at_error(err) ⇒ Object
Replays at a given error (hopefully you fixed the cause of the error before replaying…).
-
#resume(wi_or_fei_or_wfid, opts = {}) ⇒ Object
Given a wfid will [attempt to] resume the process instance.
-
#schedules(wfid = nil) ⇒ Object
Returns an array of schedules.
-
#shutdown ⇒ Object
Shuts down the engine, mostly passes the shutdown message to the other services and hope they’ll shut down properly.
-
#storage ⇒ Object
Returns the storage this engine works with passed at engine initialization.
-
#storage_participant ⇒ Object
A convenience method for.
-
#unregister_participant(name_or_participant) ⇒ Object
(also: #unregister)
Removes/unregisters a participant from the engine.
-
#wait_for(*items) ⇒ Object
This method expects there to be a logger with a wait_for method in the context, else it will raise an exception.
-
#worker ⇒ Object
Returns the worker nested inside this engine (passed at initialization).
Methods included from ReceiverMixin
#applied_workitem, #fetch_flow_expression, #launch, #receive, #reply, #reply_to_engine, #sign
Constructor Details
#initialize(worker_or_storage, opts = true) ⇒ Engine
Creates an engine using either worker or storage.
If a storage instance is given as the first argument, the engine will be able to manage processes (for example, launch and cancel workflows) but will not actually run any workflows.
If a worker instance is given as the first argument and the second argument is true, engine will start the worker and will be able to both manage and run workflows.
If the second options is set to { :join => true }, the worker will be started and run in the current thread (and the initialize method will not return).
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/ruote/engine.rb', line 62 def initialize(worker_or_storage, opts=true) @context = worker_or_storage.context @context.engine = self @variables = EngineVariables.new(@context.storage) if @context.worker if opts == true @context.worker.run_in_thread # runs worker in its own thread elsif opts == { :join => true } @context.worker.run # runs worker in current thread (and doesn't return) #else # worker is not run end #else # no worker end end |
Instance Attribute Details
#context ⇒ Object (readonly)
Returns the value of attribute context.
45 46 47 |
# File 'lib/ruote/engine.rb', line 45 def context @context end |
#variables ⇒ Object (readonly)
Returns the value of attribute variables.
46 47 48 |
# File 'lib/ruote/engine.rb', line 46 def variables @variables end |
Instance Method Details
#add_service(name, path_or_instance, classname = nil, opts = nil) ⇒ Object
Adds a service locally (will not get propagated to other workers).
tracer = Tracer.new
@engine.add_service('tracer', tracer)
or
@engine.add_service('tracer', 'ruote/exp/tracer', 'Ruote::Exp::Tracer')
This method returns the service instance it just bound.
763 764 765 766 |
# File 'lib/ruote/engine.rb', line 763 def add_service(name, path_or_instance, classname=nil, opts=nil) @context.add_service(name, path_or_instance, classname, opts) end |
#cancel(wi_or_fei_or_wfid) ⇒ Object Also known as: cancel_process, cancel_expression
Given a workitem or a fei, will do a cancel_expression, else it’s a wfid and it does a cancel_process.
166 167 168 169 |
# File 'lib/ruote/engine.rb', line 166 def cancel(wi_or_fei_or_wfid) do_misc('cancel', wi_or_fei_or_wfid, {}) end |
#configuration(config_key) ⇒ Object
Returns a configuration value.
engine.configure('ruby_eval_allowed', true)
p engine.configuration('ruby_eval_allowed')
# => true
789 790 791 792 |
# File 'lib/ruote/engine.rb', line 789 def configuration(config_key) @context[config_key] end |
#configure(config_key, value) ⇒ Object
Sets a configuration option. Examples:
# allow remote workflow definitions (for subprocesses or when launching
# processes)
@engine.configure('remote_definition_allowed', true)
# allow ruby_eval
@engine.configure('ruby_eval_allowed', true)
777 778 779 780 |
# File 'lib/ruote/engine.rb', line 777 def configure(config_key, value) @context[config_key] = value end |
#error(wi_or_fei) ⇒ Object
Given a workitem or a fei (or a String version of a fei), returns the corresponding error (or nil if there is no other).
354 355 356 357 358 359 360 |
# File 'lib/ruote/engine.rb', line 354 def error(wi_or_fei) fei = Ruote.extract_fei(wi_or_fei) err = @context.storage.get('errors', "err_#{fei.sid}") err ? ProcessError.new(err) : nil end |
#errors(wfid = nil) ⇒ Object
Returns an array of current errors (hashes)
Can be called in two ways :
engine.errors(wfid)
and
engine.errors(:skip => 100, :limit => 100)
338 339 340 341 342 343 344 345 346 347 348 349 |
# File 'lib/ruote/engine.rb', line 338 def errors(wfid=nil) wfid, = wfid.is_a?(Hash) ? [ nil, wfid ] : [ wfid, {} ] errs = wfid.nil? ? @context.storage.get_many('errors', nil, ) : @context.storage.get_many('errors', wfid) return errs if [:count] errs.collect { |err| ProcessError.new(err) } end |
#history ⇒ Object
A shortcut for engine.context.history
103 104 105 106 |
# File 'lib/ruote/engine.rb', line 103 def history @context.history end |
#join ⇒ Object
Joins the worker thread. If this engine has no nested worker, calling this method will simply return immediately.
479 480 481 482 |
# File 'lib/ruote/engine.rb', line 479 def join worker.join if worker end |
#kill(wi_or_fei_or_wfid) ⇒ Object Also known as: kill_process, kill_expression
Given a workitem or a fei, will do a kill_expression, else it’s a wfid and it does a kill_process.
177 178 179 180 |
# File 'lib/ruote/engine.rb', line 177 def kill(wi_or_fei_or_wfid) do_misc('kill', wi_or_fei_or_wfid, {}) end |
#launch_single(process_definition, fields = {}, variables = {}) ⇒ Object
Quick note : the implementation of launch is found in the module Ruote::ReceiverMixin that the engine includes.
Some processes have to have one and only one instance of themselves running, these are called ‘singles’ (‘singleton’ is too object-oriented).
When called, this method will check if an instance of the pdef is already running (it uses the process definition name attribute), if yes, it will return without having launched anything. If there is no such process running, it will launch it (and register it).
Returns the wfid (workflow instance id) of the running single.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/ruote/engine.rb', line 121 def launch_single(process_definition, fields={}, variables={}) tree = @context.reader.read(process_definition) name = tree[1]['name'] || (tree[1].find { |k, v| v.nil? } || []).first raise ArgumentError.new( 'process definition is missing a name, cannot launch as single' ) unless name singles = @context.storage.get('variables', 'singles') || { '_id' => 'singles', 'type' => 'variables', 'h' => {} } wfid, = singles['h'][name] return wfid if wfid && (ps(wfid) || Time.now.to_f - < 1.0) # return wfid if 'singleton' process is already running wfid = @context.wfidgen.generate singles['h'][name] = [ wfid, Time.now.to_f ] r = @context.storage.put(singles) return launch_single(tree, fields, variables) unless r.nil? # # the put failed, back to the start... # # all this to prevent races between multiple engines, # multiple launch_single calls (from different Ruby runtimes) # ... green for launch @context.storage.put_msg( 'launch', 'wfid' => wfid, 'tree' => tree, 'workitem' => { 'fields' => fields }, 'variables' => variables) wfid end |
#leftovers ⇒ Object
Warning : expensive operation.
Leftovers are workitems, errors and schedules belonging to process instances for which there are no more expressions left.
Better delete them or investigate why they are left here.
The result is a list of documents (hashes) as found in the storage. Each of them might represent a workitem, an error or a schedule.
If you want to delete one of them you can do
engine.storage.delete(doc)
415 416 417 418 419 420 421 422 423 424 425 |
# File 'lib/ruote/engine.rb', line 415 def leftovers wfids = @context.storage.expression_wfids({}) wis = @context.storage.get_many('workitems').compact ers = @context.storage.get_many('errors').compact scs = @context.storage.get_many('schedules').compact # some slow storages need the compaction... [c]ouch... (wis + ers + scs).reject { |doc| wfids.include?(doc['fei']['wfid']) } end |
#load_definition(path) ⇒ Object
Loads (and turns into a tree) the process definition at the given path.
486 487 488 489 |
# File 'lib/ruote/engine.rb', line 486 def load_definition(path) @context.reader.read(path) end |
#noisy=(b) ⇒ Object
A debug helper :
engine.noisy = true
will let the engine (in fact the worker) pour all the details of the executing process instances to STDOUT.
891 892 893 894 |
# File 'lib/ruote/engine.rb', line 891 def noisy=(b) @context.logger.noisy = b end |
#on_error ⇒ Object
Returns the process tree that is triggered in case of error.
Note that this ‘on_error’ doesn’t trigger if an on_error is defined in the process itself.
Returns nil if there is no ‘on_error’ set.
801 802 803 804 805 806 807 |
# File 'lib/ruote/engine.rb', line 801 def on_error @context.storage.get_trackers['trackers']['on_error']['msg']['tree'] rescue nil end |
#on_error=(target) ⇒ Object
Sets a participant or subprocess to be triggered when an error occurs in a process instance.
engine.on_error = participant_name
engine.on_error = subprocess_name
engine.on_error = Ruote.process_definition do
alpha
end
Note that this ‘on_error’ doesn’t trigger if an on_error is defined in the process itself.
838 839 840 841 842 843 844 845 846 847 848 849 850 851 |
# File 'lib/ruote/engine.rb', line 838 def on_error=(target) @context.tracker.add_tracker( nil, # do not track a specific wfid 'error_intercepted', # react on 'error_intercepted' msgs 'on_error', # the identifier nil, # no specific condition { 'action' => 'launch', 'wfid' => 'replace', 'tree' => target.is_a?(String) ? [ 'define', {}, [ [ target, {}, [] ] ] ] : target, 'workitem' => 'replace', 'variables' => 'compile' }) end |
#on_terminate ⇒ Object
Returns the process tree that is triggered in case of process termination.
Note that a termination process doesn’t raise a termination process when it terminates itself.
Returns nil if there is no ‘on_terminate’ set.
816 817 818 819 820 821 822 |
# File 'lib/ruote/engine.rb', line 816 def on_terminate @context.storage.get_trackers['trackers']['on_terminate']['msg']['tree'] rescue nil end |
#on_terminate=(target) ⇒ Object
Sets a participant or a subprocess that is to be launched/called whenever a regular process terminates.
engine.on_terminate = participant_name
engine.on_terminate = subprocess_name
engine.on_terminate = Ruote.define do
alpha
bravo
end
Note that a termination process doesn’t raise a termination process when it terminates itself.
on_terminate processes are not triggered for on_error processes. on_error processes are triggered for on_terminate processes as well.
871 872 873 874 875 876 877 878 879 880 881 882 |
# File 'lib/ruote/engine.rb', line 871 def on_terminate=(target) @context.tracker.add_tracker( nil, # do not track a specific wfid 'terminated', # react on 'error_intercepted' msgs 'on_terminate', # the identifier nil, # no specific condition { 'action' => 'launch', 'tree' => target.is_a?(String) ? [ 'define', {}, [ [ target, {}, [] ] ] ] : target, 'workitem' => 'replace' }) end |
#participant(name) ⇒ Object
Returns an instance of the participant registered under the given name. Returns nil if there is no participant registered for that name.
747 748 749 750 |
# File 'lib/ruote/engine.rb', line 747 def participant(name) @context.plist.lookup(name.to_s, nil) end |
#participant_list ⇒ Object
Returns a list of Ruote::ParticipantEntry instances.
engine.register_participant :alpha, MyParticipant, 'message' => 'hello'
# interrogate participant list
#
list = engine.participant_list
participant = list.first
p participant.regex
# => "^alpha$"
p participant.classname
# => "MyParticipant"
p participant.
# => {"message"=>"hello"}
# update participant list
#
participant.regex = '^alfred$'
engine.participant_list = list
706 707 708 709 |
# File 'lib/ruote/engine.rb', line 706 def participant_list @context.plist.list end |
#participant_list=(pl) ⇒ Object
Accepts a list of Ruote::ParticipantEntry instances or a list of
- regex, [ classname, opts
-
] arrays.
See Engine#participant_list
Some examples :
engine.participant_list = [
[ '^charly$', [ 'Ruote::StorageParticipant', {} ] ],
[ '.+', [ 'MyDefaultParticipant', { 'default' => true } ]
]
This method writes the participant list in one go, it might be easier to use than to register participant one by ones.
726 727 728 729 |
# File 'lib/ruote/engine.rb', line 726 def participant_list=(pl) @context.plist.list = pl end |
#pause(wi_or_fei_or_wfid, opts = {}) ⇒ Object
Given a wfid, will [attempt to] pause the corresponding process instance. Given an expression id (fei) will [attempt to] pause the expression and its children.
The only known option for now is :breakpoint => true, which lets the engine only pause the targetted expression.
fei and :breakpoint => true
By default, pausing an expression will pause that expression and all its children.
engine.pause(fei, :breakpoint => true)
will only flag as paused the given fei. When the children of that expression will reply to it, the execution for this branch of the process will stop, much like a break point.
204 205 206 207 208 209 210 211 |
# File 'lib/ruote/engine.rb', line 204 def pause(wi_or_fei_or_wfid, opts={}) raise ArgumentError.new( ':breakpoint option only valid when passing a workitem or a fei' ) if opts[:breakpoint] and wi_or_fei_or_wfid.is_a?(String) do_misc('pause', wi_or_fei_or_wfid, opts) end |
#process(wfid) ⇒ Object
Returns a ProcessStatus instance describing the current status of a process instance.
297 298 299 300 |
# File 'lib/ruote/engine.rb', line 297 def process(wfid) statuses([ wfid ], {}).first end |
#process_ids ⇒ Object Also known as: process_wfids
Returns a [sorted] list of wfids of the process instances currently running in the engine.
This operation is substantially less costly than Engine#processes (though the ‘how substantially’ depends on the storage chosen).
394 395 396 397 |
# File 'lib/ruote/engine.rb', line 394 def process_ids @context.storage.expression_wfids({}) end |
#processes(opts = {}) ⇒ Object
Returns an array of ProcessStatus instances.
WARNING : this is an expensive operation, but it understands :skip and :limit, so pagination is our friend.
Please note, if you’re interested only in processes that have errors, Engine#errors is a more efficient means.
To simply list the wfids of the currently running, Engine#process_wfids is way cheaper to call.
313 314 315 316 317 318 |
# File 'lib/ruote/engine.rb', line 313 def processes(opts={}) wfids = @context.storage.expression_wfids(opts) opts[:count] ? wfids.size : statuses(wfids, opts) end |
#ps(wfid = nil) ⇒ Object
Returns a list of processes or the process status of a given process instance.
323 324 325 326 |
# File 'lib/ruote/engine.rb', line 323 def ps(wfid=nil) wfid == nil ? processes : process(wfid) end |
#re_apply(fei, opts = {}) ⇒ Object
Re-applies an expression (given via its FlowExpressionId).
That will cancel the expression and, once the cancel operation is over (all the children have been cancelled), the expression will get re-applied.
The fei parameter may be a hash, a Ruote::FlowExpressionId instance, a Ruote::Workitem instance or a sid string.
options
:tree is used to completely change the tree of the expression at re_apply
engine.re_apply(fei, :tree => [ 'participant', { 'ref' => 'bob' }, [] ])
:fields is used to replace the fields of the workitem at re_apply
engine.re_apply(fei, :fields => { 'customer' => 'bob' })
:merge_in_fields is used to add / override fields
engine.re_apply(fei, :merge_in_fields => { 'customer' => 'bob' })
286 287 288 289 290 291 292 |
# File 'lib/ruote/engine.rb', line 286 def re_apply(fei, opts={}) @context.storage.put_msg( 'cancel', 'fei' => FlowExpressionId.extract_h(fei), 're_apply' => Ruote.keys_to_s(opts)) end |
#register(*args, &block) ⇒ Object
A shorter version of #register_participant
engine.register 'alice', MailParticipant, :target => '[email protected]'
or a block registering mechanism.
engine.register do
alpha 'Participants::Alpha', 'flavour' => 'vanilla'
participant 'bravo', 'Participants::Bravo', :flavour => 'peach'
catchall ParticipantCharlie, 'flavour' => 'coconut'
end
Originally implemented in ruote-kit by Torsten Schoenebaum.
registration in block and :clear
By default, when registering multiple participants in block, ruote considers you’re wiping the participant list and re-adding them all.
You can prevent the clearing by stating :clear => false like in :
engine.register :clear => false do
alpha 'Participants::Alpha', 'flavour' => 'vanilla'
participant 'bravo', 'Participants::Bravo', :flavour => 'peach'
catchall ParticipantCharlie, 'flavour' => 'coconut'
end
658 659 660 661 662 663 664 665 666 667 668 669 |
# File 'lib/ruote/engine.rb', line 658 def register(*args, &block) clear = args.first.is_a?(Hash) ? args.pop[:clear] : true if args.size > 0 register_participant(*args, &block) else @context.plist.clear if clear proxy = ParticipantRegistrationProxy.new(self) block.arity < 1 ? proxy.instance_eval(&block) : block.call(proxy) end end |
#register_participant(regex, participant = nil, opts = {}, &block) ⇒ Object
Registers a participant in the engine.
Takes the form
engine.register_participant name_or_regex, klass, opts={}
With the form
engine.register_participant name_or_regex do |workitem|
# ...
end
A BlockParticipant is automatically created.
name or regex
When registering participants, strings or regexes are accepted. Behind the scenes, a regex is kept.
Passing a string like “alain” will get ruote to automatically turn it into the following regex : /^alain$/.
For finer control over this, pass a regex directly
engine.register_participant /^user-/, MyParticipant
# will match all workitems whose participant name starts with "user-"
some examples
engine.register_participant 'compute_sum' do |wi|
wi.fields['sum'] = wi.fields['articles'].inject(0) do |s, (c, v)|
s + c * v # sum + count * value
end
# a block participant implicitely replies to the engine immediately
end
class MyParticipant
def initialize(opts)
@name = opts['name']
end
def consume(workitem)
workitem.fields['rocket_name'] = @name
send_to_the_moon(workitem)
end
def cancel(fei, flavour)
# do nothing
end
end
engine.register_participant(
/^moon-.+/, MyParticipant, 'name' => 'Saturn-V')
# computing the total for a invoice being passed in the workitem.
#
class TotalParticipant
include Ruote::LocalParticipant
def consume(workitem)
workitem['total'] = workitem.fields['items'].inject(0.0) { |t, item|
t + item['count'] * PricingService.lookup(item['id'])
}
reply_to_engine(workitem)
end
end
engine.register_participant 'total', TotalParticipant
Remember that the options (the hash that follows the class name), must be serializable via JSON.
require_path and load_path
It’s OK to register a participant by passing its full classname as a String.
engine.register_participant(
'auditor', 'AuditParticipant', 'require_path' => 'part/audit.rb')
engine.register_participant(
'auto_decision', 'DecParticipant', 'load_path' => 'part/dec.rb')
Note the option load_path / require_path that point to the ruby file containing the participant implementation. ‘require’ will load and eval the ruby code only once, ‘load’ each time.
:override => false
By default, when registering a participant, if this results in a regex that is already used, the previously registered participant gets unregistered.
engine.register_participant 'alpha', AaParticipant
engine.register_participant 'alpha', BbParticipant, :override => false
This can be useful when the #accept? method of participants are in use.
Note that using the #register(&block) method, :override => false is automatically enforced.
engine.register do
alpha AaParticipant
alpha BbParticipant
end
:position / :pos => ‘last’ / ‘first’ / ‘before’ / ‘after’ / ‘over’
One can specify the position where the participant should be inserted in the participant list.
engine.register_participant 'auditor', AuditParticipant, :pos => 'last'
-
last : it’s the default, places the participant at the end of the list
-
first : top of the list
-
before : implies :override => false, places before the existing participant with the same regex
-
after : implies :override => false, places after the last existing participant with the same regex
-
over : overrides in the same position (while the regular, default overide removes and then places the new participant at the end of the list)
615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 |
# File 'lib/ruote/engine.rb', line 615 def register_participant(regex, participant=nil, opts={}, &block) if participant.is_a?(Hash) opts = participant participant = nil end pa = @context.plist.register(regex, participant, opts, block) @context.storage.put_msg( 'participant_registered', 'regex' => regex.is_a?(Regexp) ? regex.inspect : regex.to_s) pa end |
#replay_at_error(err) ⇒ Object
Replays at a given error (hopefully you fixed the cause of the error before replaying…)
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/ruote/engine.rb', line 239 def replay_at_error(err) err = error(err) unless err.is_a?(Ruote::ProcessError) msg = err.msg.dup if tree = msg['tree'] # # as soon as there is a tree, it means it's a re_apply re_apply(msg['fei'], 'tree' => tree, 'replay_at_error' => true) else action = msg.delete('action') msg['replay_at_error'] = true # just an indication @context.storage.delete(err.to_h) # remove error @context.storage.put_msg(action, msg) # trigger replay end end |
#resume(wi_or_fei_or_wfid, opts = {}) ⇒ Object
Given a wfid will [attempt to] resume the process instance. Given an expression id (fei) will [attempt to] to resume the expression and its children.
Note : this is supposed to be called on paused expressions / instances, this is NOT meant to be called to unstuck / unhang a process.
resume(wfid, :anyway => true)
Resuming a process instance is equivalent to calling resume on its root expression. If the root is not paused itself, this will have no effect.
engine.resume(wfid, :anyway => true)
will make sure to call resume on each of the paused branch within the process instance (tree), effectively resuming the whole process.
231 232 233 234 |
# File 'lib/ruote/engine.rb', line 231 def resume(wi_or_fei_or_wfid, opts={}) do_misc('resume', wi_or_fei_or_wfid, opts) end |
#schedules(wfid = nil) ⇒ Object
Returns an array of schedules. Those schedules are open structs with various properties, like target, owner, at, put_at, …
Introduced mostly for ruote-kit.
Can be called in two ways :
engine.schedules(wfid)
and
engine.schedules(:skip => 100, :limit => 100)
375 376 377 378 379 380 381 382 383 384 385 386 |
# File 'lib/ruote/engine.rb', line 375 def schedules(wfid=nil) wfid, = wfid.is_a?(Hash) ? [ nil, wfid ] : [ wfid, {} ] scheds = wfid.nil? ? @context.storage.get_many('schedules', nil, ) : @context.storage.get_many('schedules', /!#{wfid}-\d+$/) return scheds if [:count] scheds.collect { |s| Ruote.schedule_to_h(s) }.sort_by { |s| s['wfid'] } end |
#shutdown ⇒ Object
Shuts down the engine, mostly passes the shutdown message to the other services and hope they’ll shut down properly.
430 431 432 433 |
# File 'lib/ruote/engine.rb', line 430 def shutdown @context.shutdown end |
#storage ⇒ Object
Returns the storage this engine works with passed at engine initialization.
87 88 89 90 |
# File 'lib/ruote/engine.rb', line 87 def storage @context.storage end |
#storage_participant ⇒ Object
A convenience method for
sp = Ruote::StorageParticipant.new(engine)
simply do
sp = engine.storage_participant
739 740 741 742 |
# File 'lib/ruote/engine.rb', line 739 def storage_participant @storage_participant ||= Ruote::StorageParticipant.new(self) end |
#unregister_participant(name_or_participant) ⇒ Object Also known as: unregister
Removes/unregisters a participant from the engine.
673 674 675 676 677 678 679 680 681 682 |
# File 'lib/ruote/engine.rb', line 673 def unregister_participant(name_or_participant) re = @context.plist.unregister(name_or_participant) raise(ArgumentError.new('participant not found')) unless re @context.storage.put_msg( 'participant_unregistered', 'regex' => re.to_s) end |
#wait_for(*items) ⇒ Object
This method expects there to be a logger with a wait_for method in the context, else it will raise an exception.
WARNING : wait_for() is meant for environments where there is a unique worker and that worker is nested in this engine. In a multiple worker environment wait_for doesn’t see events handled by ‘other’ workers.
This method is only useful for test/quickstart/examples environments.
engine.wait_for(:alpha)
# will make the current thread block until a workitem is delivered
# to the participant named 'alpha'
engine.wait_for('123432123-9043')
# will make the current thread block until the processed whose
# wfid is given (String) terminates or produces an error.
engine.wait_for(5)
# will make the current thread block until 5 messages have been
# processed on the workqueue...
engine.wait_for(:empty)
# will return as soon as the engine/storage is empty, ie as soon
# as there are no more processes running in the engine (no more
# expressions placed in the storage)
It’s OK to wait for multiple wfids :
engine.wait_for('20100612-bezerijozo', '20100612-yakisoba')
465 466 467 468 469 470 471 472 473 474 |
# File 'lib/ruote/engine.rb', line 465 def wait_for(*items) logger = @context['s_logger'] raise( "can't wait_for, there is no logger that responds to that call" ) unless logger.respond_to?(:wait_for) logger.wait_for(items) end |
#worker ⇒ Object
Returns the worker nested inside this engine (passed at initialization). Returns nil if this engine is only linked to a storage (and the worker is running somewhere else (hopefully)).
96 97 98 99 |
# File 'lib/ruote/engine.rb', line 96 def worker @context.worker end |