Class: Ruote::Worker
- Inherits:
-
Object
- Object
- Ruote::Worker
- Defined in:
- lib/ruote/worker.rb
Overview
Workers fetch ‘msgs’ and ‘schedules’ from the storage and process them.
Read more at ruote.rubyforge.org/configuration.html
Constant Summary collapse
- EXP_ACTIONS =
%w[ reply cancel fail receive dispatched pause resume ]
- PROC_ACTIONS =
‘apply’ is comprised in ‘launch’ ‘receive’ is a ParticipantExpression alias for ‘reply’
%w[ cancel kill pause resume ].collect { |a| a + '_process' }
- DISP_ACTIONS =
%w[ dispatch dispatch_cancel dispatch_pause dispatch_resume ]
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#run_thread ⇒ Object
readonly
Returns the value of attribute run_thread.
-
#running ⇒ Object
readonly
Returns the value of attribute running.
-
#storage ⇒ Object
readonly
Returns the value of attribute storage.
Instance Method Summary collapse
-
#inactive? ⇒ Boolean
Returns true if the engine system is inactive, ie if all the process instances are terminated or are stuck in an error.
-
#initialize(storage) ⇒ Worker
constructor
Given a storage, creates a new instance of a Worker.
-
#join ⇒ Object
Joins the run thread of this worker (if there is no such thread, this method will return immediately, without any effect).
-
#run ⇒ Object
Runs the worker in the current thread.
-
#run_in_thread ⇒ Object
Triggers the run method of the worker in a dedicated thread.
-
#shutdown(join = true) ⇒ Object
Shuts down this worker (makes sure it won’t fetch further messages and schedules).
-
#subscribe(actions, subscriber) ⇒ Object
Loggers and trackers call this method when subscribing for events / actions in this worker.
Constructor Details
#initialize(storage) ⇒ Worker
Given a storage, creates a new instance of a Worker.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/ruote/worker.rb', line 52 def initialize(storage) @subscribers = [] # must be ready before the storage is created # services like Logger to subscribe to the worker @storage = storage @context = Ruote::Context.new(storage, self) @last_time = Time.at(0.0).utc # 1970... @running = true @run_thread = nil @msgs = [] @sleep_time = 0.000 end |
Instance Attribute Details
#context ⇒ Object (readonly)
Returns the value of attribute context.
45 46 47 |
# File 'lib/ruote/worker.rb', line 45 def context @context end |
#run_thread ⇒ Object (readonly)
Returns the value of attribute run_thread.
47 48 49 |
# File 'lib/ruote/worker.rb', line 47 def run_thread @run_thread end |
#running ⇒ Object (readonly)
Returns the value of attribute running.
48 49 50 |
# File 'lib/ruote/worker.rb', line 48 def running @running end |
#storage ⇒ Object (readonly)
Returns the value of attribute storage.
44 45 46 |
# File 'lib/ruote/worker.rb', line 44 def storage @storage end |
Instance Method Details
#inactive? ⇒ Boolean
Returns true if the engine system is inactive, ie if all the process instances are terminated or are stuck in an error.
NOTE : for now, if a branch of a process is in error while another is still running, this method will consider the process instance inactive (and it will return true if all the processes are considered inactive).
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/ruote/worker.rb', line 130 def inactive? # the cheaper tests first return false if @msgs.size > 0 return false unless @context.storage.empty?('schedules') return false unless @context.storage.empty?('msgs') wfids = @context.storage.get_many('expressions').collect { |exp| exp['fei']['wfid'] } error_wfids = @context.storage.get_many('errors').collect { |err| err['fei']['wfid'] } (wfids - error_wfids == []) end |
#join ⇒ Object
Joins the run thread of this worker (if there is no such thread, this method will return immediately, without any effect).
93 94 95 96 |
# File 'lib/ruote/worker.rb', line 93 def join @run_thread.join if @run_thread end |
#run ⇒ Object
Runs the worker in the current thread. See #run_in_thread for running in a dedicated thread.
73 74 75 76 |
# File 'lib/ruote/worker.rb', line 73 def run step while @running end |
#run_in_thread ⇒ Object
Triggers the run method of the worker in a dedicated thread.
80 81 82 83 84 85 86 87 88 |
# File 'lib/ruote/worker.rb', line 80 def run_in_thread Thread.abort_on_exception = true # TODO : remove me at some point @running = true @run_thread = Thread.new { run } end |
#shutdown(join = true) ⇒ Object
Shuts down this worker (makes sure it won’t fetch further messages and schedules).
109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/ruote/worker.rb', line 109 def shutdown(join=true) @running = false if join begin @run_thread.join rescue Exception => e end else sleep(3) end end |
#subscribe(actions, subscriber) ⇒ Object
Loggers and trackers call this method when subscribing for events / actions in this worker.
101 102 103 104 |
# File 'lib/ruote/worker.rb', line 101 def subscribe(actions, subscriber) @subscribers << [ actions, subscriber ] end |