Class: Ruote::Engine

Inherits:
Object
  • Object
show all
Includes:
ReceiverMixin
Defined in:
lib/ruote/engine.rb

Overview

This class holds the ‘engine’ name, perhaps ‘dashboard’ would have been a better name. Anyway, the methods here allow to launch processes and to query about their status. There are also methods for fixing issues with stalled processes or processes stuck in errors.

NOTE : the methods #launch and #reply are implemented in Ruote::ReceiverMixin (this Engine class has all the methods of a Receiver).

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ReceiverMixin

#applied_workitem, #fetch_flow_expression, #launch, #receive, #reply, #reply_to_engine, #sign

Constructor Details

#initialize(worker_or_storage, opts = true) ⇒ Engine

Creates an engine using either worker or storage.

If a storage instance is given as the first argument, the engine will be able to manage processes (for example, launch and cancel workflows) but will not actually run any workflows.

If a worker instance is given as the first argument and the second argument is true, engine will start the worker and will be able to both manage and run workflows.

If the second options is set to { :join => true }, the worker will be started and run in the current thread (and the initialize method will not return).



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/ruote/engine.rb', line 62

def initialize(worker_or_storage, opts=true)

  @context = worker_or_storage.context
  @context.engine = self

  @variables = EngineVariables.new(@context.storage)

  if @context.worker
    if opts == true
      @context.worker.run_in_thread
        # runs worker in its own thread
    elsif opts == { :join => true }
      @context.worker.run
        # runs worker in current thread (and doesn't return)
    #else
      # worker is not run
    end
  #else
    # no worker
  end
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



45
46
47
# File 'lib/ruote/engine.rb', line 45

def context
  @context
end

#variablesObject (readonly)

Returns the value of attribute variables.



46
47
48
# File 'lib/ruote/engine.rb', line 46

def variables
  @variables
end

Instance Method Details

#add_service(name, path_or_instance, classname = nil, opts = nil) ⇒ Object

Adds a service locally (will not get propagated to other workers).

tracer = Tracer.new
@engine.add_service('tracer', tracer)

or

@engine.add_service('tracer', 'ruote/exp/tracer', 'Ruote::Exp::Tracer')

This method returns the service instance it just bound.



763
764
765
766
# File 'lib/ruote/engine.rb', line 763

def add_service(name, path_or_instance, classname=nil, opts=nil)

  @context.add_service(name, path_or_instance, classname, opts)
end

#cancel(wi_or_fei_or_wfid) ⇒ Object Also known as: cancel_process, cancel_expression

Given a workitem or a fei, will do a cancel_expression, else it’s a wfid and it does a cancel_process.



166
167
168
169
# File 'lib/ruote/engine.rb', line 166

def cancel(wi_or_fei_or_wfid)

  do_misc('cancel', wi_or_fei_or_wfid, {})
end

#configuration(config_key) ⇒ Object

Returns a configuration value.

engine.configure('ruby_eval_allowed', true)

p engine.configuration('ruby_eval_allowed')
  # => true


789
790
791
792
# File 'lib/ruote/engine.rb', line 789

def configuration(config_key)

  @context[config_key]
end

#configure(config_key, value) ⇒ Object

Sets a configuration option. Examples:

# allow remote workflow definitions (for subprocesses or when launching
# processes)
@engine.configure('remote_definition_allowed', true)

# allow ruby_eval
@engine.configure('ruby_eval_allowed', true)


777
778
779
780
# File 'lib/ruote/engine.rb', line 777

def configure(config_key, value)

  @context[config_key] = value
end

#error(wi_or_fei) ⇒ Object

Given a workitem or a fei (or a String version of a fei), returns the corresponding error (or nil if there is no other).



354
355
356
357
358
359
360
# File 'lib/ruote/engine.rb', line 354

def error(wi_or_fei)

  fei = Ruote.extract_fei(wi_or_fei)
  err = @context.storage.get('errors', "err_#{fei.sid}")

  err ? ProcessError.new(err) : nil
end

#errors(wfid = nil) ⇒ Object

Returns an array of current errors (hashes)

Can be called in two ways :

engine.errors(wfid)

and

engine.errors(:skip => 100, :limit => 100)


338
339
340
341
342
343
344
345
346
347
348
349
# File 'lib/ruote/engine.rb', line 338

def errors(wfid=nil)

  wfid, options = wfid.is_a?(Hash) ? [ nil, wfid ] : [ wfid, {} ]

  errs = wfid.nil? ?
    @context.storage.get_many('errors', nil, options) :
    @context.storage.get_many('errors', wfid)

  return errs if options[:count]

  errs.collect { |err| ProcessError.new(err) }
end

#historyObject

A shortcut for engine.context.history



103
104
105
106
# File 'lib/ruote/engine.rb', line 103

def history

  @context.history
end

#joinObject

Joins the worker thread. If this engine has no nested worker, calling this method will simply return immediately.



479
480
481
482
# File 'lib/ruote/engine.rb', line 479

def join

  worker.join if worker
end

#kill(wi_or_fei_or_wfid) ⇒ Object Also known as: kill_process, kill_expression

Given a workitem or a fei, will do a kill_expression, else it’s a wfid and it does a kill_process.



177
178
179
180
# File 'lib/ruote/engine.rb', line 177

def kill(wi_or_fei_or_wfid)

  do_misc('kill', wi_or_fei_or_wfid, {})
end

#launch_single(process_definition, fields = {}, variables = {}) ⇒ Object

Quick note : the implementation of launch is found in the module Ruote::ReceiverMixin that the engine includes.

Some processes have to have one and only one instance of themselves running, these are called ‘singles’ (‘singleton’ is too object-oriented).

When called, this method will check if an instance of the pdef is already running (it uses the process definition name attribute), if yes, it will return without having launched anything. If there is no such process running, it will launch it (and register it).

Returns the wfid (workflow instance id) of the running single.

Raises:

  • (ArgumentError)


121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/ruote/engine.rb', line 121

def launch_single(process_definition, fields={}, variables={})

  tree = @context.reader.read(process_definition)
  name = tree[1]['name'] || (tree[1].find { |k, v| v.nil? } || []).first

  raise ArgumentError.new(
    'process definition is missing a name, cannot launch as single'
  ) unless name

  singles = @context.storage.get('variables', 'singles') || {
    '_id' => 'singles', 'type' => 'variables', 'h' => {}
  }
  wfid, timestamp = singles['h'][name]

  return wfid if wfid && (ps(wfid) || Time.now.to_f - timestamp < 1.0)
    # return wfid if 'singleton' process is already running

  wfid = @context.wfidgen.generate

  singles['h'][name] = [ wfid, Time.now.to_f ]

  r = @context.storage.put(singles)

  return launch_single(tree, fields, variables) unless r.nil?
    #
    # the put failed, back to the start...
    #
    # all this to prevent races between multiple engines,
    # multiple launch_single calls (from different Ruby runtimes)

  # ... green for launch

  @context.storage.put_msg(
    'launch',
    'wfid' => wfid,
    'tree' => tree,
    'workitem' => { 'fields' => fields },
    'variables' => variables)

  wfid
end

#leftoversObject

Warning : expensive operation.

Leftovers are workitems, errors and schedules belonging to process instances for which there are no more expressions left.

Better delete them or investigate why they are left here.

The result is a list of documents (hashes) as found in the storage. Each of them might represent a workitem, an error or a schedule.

If you want to delete one of them you can do

engine.storage.delete(doc)


415
416
417
418
419
420
421
422
423
424
425
# File 'lib/ruote/engine.rb', line 415

def leftovers

  wfids = @context.storage.expression_wfids({})

  wis = @context.storage.get_many('workitems').compact
  ers = @context.storage.get_many('errors').compact
  scs = @context.storage.get_many('schedules').compact
    # some slow storages need the compaction... [c]ouch...

  (wis + ers + scs).reject { |doc| wfids.include?(doc['fei']['wfid']) }
end

#load_definition(path) ⇒ Object

Loads (and turns into a tree) the process definition at the given path.



486
487
488
489
# File 'lib/ruote/engine.rb', line 486

def load_definition(path)

  @context.reader.read(path)
end

#noisy=(b) ⇒ Object

A debug helper :

engine.noisy = true

will let the engine (in fact the worker) pour all the details of the executing process instances to STDOUT.



891
892
893
894
# File 'lib/ruote/engine.rb', line 891

def noisy=(b)

  @context.logger.noisy = b
end

#on_errorObject

Returns the process tree that is triggered in case of error.

Note that this ‘on_error’ doesn’t trigger if an on_error is defined in the process itself.

Returns nil if there is no ‘on_error’ set.



801
802
803
804
805
806
807
# File 'lib/ruote/engine.rb', line 801

def on_error

  @context.storage.get_trackers['trackers']['on_error']['msg']['tree']

rescue
  nil
end

#on_error=(target) ⇒ Object

Sets a participant or subprocess to be triggered when an error occurs in a process instance.

engine.on_error = participant_name

engine.on_error = subprocess_name

engine.on_error = Ruote.process_definition do
  alpha
end

Note that this ‘on_error’ doesn’t trigger if an on_error is defined in the process itself.



838
839
840
841
842
843
844
845
846
847
848
849
850
851
# File 'lib/ruote/engine.rb', line 838

def on_error=(target)

  @context.tracker.add_tracker(
    nil, # do not track a specific wfid
    'error_intercepted', # react on 'error_intercepted' msgs
    'on_error', # the identifier
    nil, # no specific condition
    { 'action' => 'launch',
      'wfid' => 'replace',
      'tree' => target.is_a?(String) ?
        [ 'define', {}, [ [ target, {}, [] ] ] ] : target,
      'workitem' => 'replace',
      'variables' => 'compile' })
end

#on_terminateObject

Returns the process tree that is triggered in case of process termination.

Note that a termination process doesn’t raise a termination process when it terminates itself.

Returns nil if there is no ‘on_terminate’ set.



816
817
818
819
820
821
822
# File 'lib/ruote/engine.rb', line 816

def on_terminate

  @context.storage.get_trackers['trackers']['on_terminate']['msg']['tree']

rescue
  nil
end

#on_terminate=(target) ⇒ Object

Sets a participant or a subprocess that is to be launched/called whenever a regular process terminates.

engine.on_terminate = participant_name

engine.on_terminate = subprocess_name

engine.on_terminate = Ruote.define do
  alpha
  bravo
end

Note that a termination process doesn’t raise a termination process when it terminates itself.

on_terminate processes are not triggered for on_error processes. on_error processes are triggered for on_terminate processes as well.



871
872
873
874
875
876
877
878
879
880
881
882
# File 'lib/ruote/engine.rb', line 871

def on_terminate=(target)

  @context.tracker.add_tracker(
    nil, # do not track a specific wfid
    'terminated', # react on 'error_intercepted' msgs
    'on_terminate', # the identifier
    nil, # no specific condition
    { 'action' => 'launch',
      'tree' => target.is_a?(String) ?
        [ 'define', {}, [ [ target, {}, [] ] ] ] : target,
      'workitem' => 'replace' })
end

#participant(name) ⇒ Object

Returns an instance of the participant registered under the given name. Returns nil if there is no participant registered for that name.



747
748
749
750
# File 'lib/ruote/engine.rb', line 747

def participant(name)

  @context.plist.lookup(name.to_s, nil)
end

#participant_listObject

Returns a list of Ruote::ParticipantEntry instances.

engine.register_participant :alpha, MyParticipant, 'message' => 'hello'

# interrogate participant list
#
list = engine.participant_list
participant = list.first
p participant.regex
  # => "^alpha$"
p participant.classname
  # => "MyParticipant"
p participant.options
  # => {"message"=>"hello"}

# update participant list
#
participant.regex = '^alfred$'
engine.participant_list = list


706
707
708
709
# File 'lib/ruote/engine.rb', line 706

def participant_list

  @context.plist.list
end

#participant_list=(pl) ⇒ Object

Accepts a list of Ruote::ParticipantEntry instances or a list of

regex, [ classname, opts

] arrays.

See Engine#participant_list

Some examples :

engine.participant_list = [
  [ '^charly$', [ 'Ruote::StorageParticipant', {} ] ],
  [ '.+', [ 'MyDefaultParticipant', { 'default' => true } ]
]

This method writes the participant list in one go, it might be easier to use than to register participant one by ones.



726
727
728
729
# File 'lib/ruote/engine.rb', line 726

def participant_list=(pl)

  @context.plist.list = pl
end

#pause(wi_or_fei_or_wfid, opts = {}) ⇒ Object

Given a wfid, will [attempt to] pause the corresponding process instance. Given an expression id (fei) will [attempt to] pause the expression and its children.

The only known option for now is :breakpoint => true, which lets the engine only pause the targetted expression.

fei and :breakpoint => true

By default, pausing an expression will pause that expression and all its children.

engine.pause(fei, :breakpoint => true)

will only flag as paused the given fei. When the children of that expression will reply to it, the execution for this branch of the process will stop, much like a break point.

Raises:

  • (ArgumentError)


204
205
206
207
208
209
210
211
# File 'lib/ruote/engine.rb', line 204

def pause(wi_or_fei_or_wfid, opts={})

  raise ArgumentError.new(
    ':breakpoint option only valid when passing a workitem or a fei'
  ) if opts[:breakpoint] and wi_or_fei_or_wfid.is_a?(String)

  do_misc('pause', wi_or_fei_or_wfid, opts)
end

#process(wfid) ⇒ Object

Returns a ProcessStatus instance describing the current status of a process instance.



297
298
299
300
# File 'lib/ruote/engine.rb', line 297

def process(wfid)

  statuses([ wfid ], {}).first
end

#process_idsObject Also known as: process_wfids

Returns a [sorted] list of wfids of the process instances currently running in the engine.

This operation is substantially less costly than Engine#processes (though the ‘how substantially’ depends on the storage chosen).



394
395
396
397
# File 'lib/ruote/engine.rb', line 394

def process_ids

  @context.storage.expression_wfids({})
end

#processes(opts = {}) ⇒ Object

Returns an array of ProcessStatus instances.

WARNING : this is an expensive operation, but it understands :skip and :limit, so pagination is our friend.

Please note, if you’re interested only in processes that have errors, Engine#errors is a more efficient means.

To simply list the wfids of the currently running, Engine#process_wfids is way cheaper to call.



313
314
315
316
317
318
# File 'lib/ruote/engine.rb', line 313

def processes(opts={})

  wfids = @context.storage.expression_wfids(opts)

  opts[:count] ? wfids.size : statuses(wfids, opts)
end

#ps(wfid = nil) ⇒ Object

Returns a list of processes or the process status of a given process instance.



323
324
325
326
# File 'lib/ruote/engine.rb', line 323

def ps(wfid=nil)

  wfid == nil ? processes : process(wfid)
end

#re_apply(fei, opts = {}) ⇒ Object

Re-applies an expression (given via its FlowExpressionId).

That will cancel the expression and, once the cancel operation is over (all the children have been cancelled), the expression will get re-applied.

The fei parameter may be a hash, a Ruote::FlowExpressionId instance, a Ruote::Workitem instance or a sid string.

options

:tree is used to completely change the tree of the expression at re_apply

engine.re_apply(fei, :tree => [ 'participant', { 'ref' => 'bob' }, [] ])

:fields is used to replace the fields of the workitem at re_apply

engine.re_apply(fei, :fields => { 'customer' => 'bob' })

:merge_in_fields is used to add / override fields

engine.re_apply(fei, :merge_in_fields => { 'customer' => 'bob' })


286
287
288
289
290
291
292
# File 'lib/ruote/engine.rb', line 286

def re_apply(fei, opts={})

  @context.storage.put_msg(
    'cancel',
    'fei' => FlowExpressionId.extract_h(fei),
    're_apply' => Ruote.keys_to_s(opts))
end

#register(*args, &block) ⇒ Object

A shorter version of #register_participant

engine.register 'alice', MailParticipant, :target => '[email protected]'

or a block registering mechanism.

engine.register do
  alpha 'Participants::Alpha', 'flavour' => 'vanilla'
  participant 'bravo', 'Participants::Bravo', :flavour => 'peach'
  catchall ParticipantCharlie, 'flavour' => 'coconut'
end

Originally implemented in ruote-kit by Torsten Schoenebaum.

registration in block and :clear

By default, when registering multiple participants in block, ruote considers you’re wiping the participant list and re-adding them all.

You can prevent the clearing by stating :clear => false like in :

engine.register :clear => false do
  alpha 'Participants::Alpha', 'flavour' => 'vanilla'
  participant 'bravo', 'Participants::Bravo', :flavour => 'peach'
  catchall ParticipantCharlie, 'flavour' => 'coconut'
end


658
659
660
661
662
663
664
665
666
667
668
669
# File 'lib/ruote/engine.rb', line 658

def register(*args, &block)

  clear = args.first.is_a?(Hash) ? args.pop[:clear] : true

  if args.size > 0
    register_participant(*args, &block)
  else
    @context.plist.clear if clear
    proxy = ParticipantRegistrationProxy.new(self)
    block.arity < 1 ? proxy.instance_eval(&block) : block.call(proxy)
  end
end

#register_participant(regex, participant = nil, opts = {}, &block) ⇒ Object

Registers a participant in the engine.

Takes the form

engine.register_participant name_or_regex, klass, opts={}

With the form

engine.register_participant name_or_regex do |workitem|
  # ...
end

A BlockParticipant is automatically created.

name or regex

When registering participants, strings or regexes are accepted. Behind the scenes, a regex is kept.

Passing a string like “alain” will get ruote to automatically turn it into the following regex : /^alain$/.

For finer control over this, pass a regex directly

engine.register_participant /^user-/, MyParticipant
  # will match all workitems whose participant name starts with "user-"

some examples

engine.register_participant 'compute_sum' do |wi|
  wi.fields['sum'] = wi.fields['articles'].inject(0) do |s, (c, v)|
    s + c * v # sum + count * value
  end
  # a block participant implicitely replies to the engine immediately
end

class MyParticipant
  def initialize(opts)
    @name = opts['name']
  end
  def consume(workitem)
    workitem.fields['rocket_name'] = @name
    send_to_the_moon(workitem)
  end
  def cancel(fei, flavour)
    # do nothing
  end
end

engine.register_participant(
  /^moon-.+/, MyParticipant, 'name' => 'Saturn-V')

# computing the total for a invoice being passed in the workitem.
#
class TotalParticipant
  include Ruote::LocalParticipant

  def consume(workitem)
    workitem['total'] = workitem.fields['items'].inject(0.0) { |t, item|
      t + item['count'] * PricingService.lookup(item['id'])
    }
    reply_to_engine(workitem)
  end
end
engine.register_participant 'total', TotalParticipant

Remember that the options (the hash that follows the class name), must be serializable via JSON.

require_path and load_path

It’s OK to register a participant by passing its full classname as a String.

engine.register_participant(
  'auditor', 'AuditParticipant', 'require_path' => 'part/audit.rb')
engine.register_participant(
  'auto_decision', 'DecParticipant', 'load_path' => 'part/dec.rb')

Note the option load_path / require_path that point to the ruby file containing the participant implementation. ‘require’ will load and eval the ruby code only once, ‘load’ each time.

:override => false

By default, when registering a participant, if this results in a regex that is already used, the previously registered participant gets unregistered.

engine.register_participant 'alpha', AaParticipant
engine.register_participant 'alpha', BbParticipant, :override => false

This can be useful when the #accept? method of participants are in use.

Note that using the #register(&block) method, :override => false is automatically enforced.

engine.register do
  alpha AaParticipant
  alpha BbParticipant
end

:position / :pos => ‘last’ / ‘first’ / ‘before’ / ‘after’ / ‘over’

One can specify the position where the participant should be inserted in the participant list.

engine.register_participant 'auditor', AuditParticipant, :pos => 'last'
  • last : it’s the default, places the participant at the end of the list

  • first : top of the list

  • before : implies :override => false, places before the existing participant with the same regex

  • after : implies :override => false, places after the last existing participant with the same regex

  • over : overrides in the same position (while the regular, default overide removes and then places the new participant at the end of the list)



615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
# File 'lib/ruote/engine.rb', line 615

def register_participant(regex, participant=nil, opts={}, &block)

  if participant.is_a?(Hash)
    opts = participant
    participant = nil
  end

  pa = @context.plist.register(regex, participant, opts, block)

  @context.storage.put_msg(
    'participant_registered',
    'regex' => regex.is_a?(Regexp) ? regex.inspect : regex.to_s)

  pa
end

#replay_at_error(err) ⇒ Object

Replays at a given error (hopefully you fixed the cause of the error before replaying…)



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/ruote/engine.rb', line 239

def replay_at_error(err)

  err = error(err) unless err.is_a?(Ruote::ProcessError)

  msg = err.msg.dup

  if tree = msg['tree']
    #
    # as soon as there is a tree, it means it's a re_apply

    re_apply(msg['fei'], 'tree' => tree, 'replay_at_error' => true)

  else

    action = msg.delete('action')

    msg['replay_at_error'] = true
      # just an indication

    @context.storage.delete(err.to_h) # remove error
    @context.storage.put_msg(action, msg) # trigger replay
  end
end

#resume(wi_or_fei_or_wfid, opts = {}) ⇒ Object

Given a wfid will [attempt to] resume the process instance. Given an expression id (fei) will [attempt to] to resume the expression and its children.

Note : this is supposed to be called on paused expressions / instances, this is NOT meant to be called to unstuck / unhang a process.

resume(wfid, :anyway => true)

Resuming a process instance is equivalent to calling resume on its root expression. If the root is not paused itself, this will have no effect.

engine.resume(wfid, :anyway => true)

will make sure to call resume on each of the paused branch within the process instance (tree), effectively resuming the whole process.



231
232
233
234
# File 'lib/ruote/engine.rb', line 231

def resume(wi_or_fei_or_wfid, opts={})

  do_misc('resume', wi_or_fei_or_wfid, opts)
end

#schedules(wfid = nil) ⇒ Object

Returns an array of schedules. Those schedules are open structs with various properties, like target, owner, at, put_at, …

Introduced mostly for ruote-kit.

Can be called in two ways :

engine.schedules(wfid)

and

engine.schedules(:skip => 100, :limit => 100)


375
376
377
378
379
380
381
382
383
384
385
386
# File 'lib/ruote/engine.rb', line 375

def schedules(wfid=nil)

  wfid, options = wfid.is_a?(Hash) ? [ nil, wfid ] : [ wfid, {} ]

  scheds = wfid.nil? ?
    @context.storage.get_many('schedules', nil, options) :
    @context.storage.get_many('schedules', /!#{wfid}-\d+$/)

  return scheds if options[:count]

  scheds.collect { |s| Ruote.schedule_to_h(s) }.sort_by { |s| s['wfid'] }
end

#shutdownObject

Shuts down the engine, mostly passes the shutdown message to the other services and hope they’ll shut down properly.



430
431
432
433
# File 'lib/ruote/engine.rb', line 430

def shutdown

  @context.shutdown
end

#storageObject

Returns the storage this engine works with passed at engine initialization.



87
88
89
90
# File 'lib/ruote/engine.rb', line 87

def storage

  @context.storage
end

#storage_participantObject

A convenience method for

sp = Ruote::StorageParticipant.new(engine)

simply do

sp = engine.storage_participant


739
740
741
742
# File 'lib/ruote/engine.rb', line 739

def storage_participant

  @storage_participant ||= Ruote::StorageParticipant.new(self)
end

#unregister_participant(name_or_participant) ⇒ Object Also known as: unregister

Removes/unregisters a participant from the engine.

Raises:

  • (ArgumentError)


673
674
675
676
677
678
679
680
681
682
# File 'lib/ruote/engine.rb', line 673

def unregister_participant(name_or_participant)

  re = @context.plist.unregister(name_or_participant)

  raise(ArgumentError.new('participant not found')) unless re

  @context.storage.put_msg(
    'participant_unregistered',
    'regex' => re.to_s)
end

#wait_for(*items) ⇒ Object

This method expects there to be a logger with a wait_for method in the context, else it will raise an exception.

WARNING : wait_for() is meant for environments where there is a unique worker and that worker is nested in this engine. In a multiple worker environment wait_for doesn’t see events handled by ‘other’ workers.

This method is only useful for test/quickstart/examples environments.

engine.wait_for(:alpha)
  # will make the current thread block until a workitem is delivered
  # to the participant named 'alpha'

engine.wait_for('123432123-9043')
  # will make the current thread block until the processed whose
  # wfid is given (String) terminates or produces an error.

engine.wait_for(5)
  # will make the current thread block until 5 messages have been
  # processed on the workqueue...

engine.wait_for(:empty)
  # will return as soon as the engine/storage is empty, ie as soon
  # as there are no more processes running in the engine (no more
  # expressions placed in the storage)

It’s OK to wait for multiple wfids :

engine.wait_for('20100612-bezerijozo', '20100612-yakisoba')


465
466
467
468
469
470
471
472
473
474
# File 'lib/ruote/engine.rb', line 465

def wait_for(*items)

  logger = @context['s_logger']

  raise(
    "can't wait_for, there is no logger that responds to that call"
  ) unless logger.respond_to?(:wait_for)

  logger.wait_for(items)
end

#workerObject

Returns the worker nested inside this engine (passed at initialization). Returns nil if this engine is only linked to a storage (and the worker is running somewhere else (hopefully)).



96
97
98
99
# File 'lib/ruote/engine.rb', line 96

def worker

  @context.worker
end