Class: Taskinator::Process::Concurrent
- Inherits:
-
Taskinator::Process
- Object
- Taskinator::Process
- Taskinator::Process::Concurrent
- Defined in:
- lib/taskinator/process.rb
Overview
Instance Attribute Summary collapse
-
#complete_on ⇒ Object
readonly
Returns the value of attribute complete_on.
-
#concurrency_method ⇒ Object
readonly
Returns the value of attribute concurrency_method.
Attributes inherited from Taskinator::Process
#created_at, #definition, #options, #parent, #queue, #updated_at, #uuid
Instance Method Summary collapse
- #accept(visitor) ⇒ Object
- #enqueue ⇒ Object
-
#initialize(definition, complete_on = CompleteOn::Default, options = {}) ⇒ Concurrent
constructor
A new instance of Concurrent.
- #inspect ⇒ Object
-
#start ⇒ Object
this method only called in-process (usually from the console).
- #task_completed(task) ⇒ Object
- #tasks_completed? ⇒ Boolean
Methods inherited from Taskinator::Process
#<=>, base_key, #cancel!, #complete!, define_concurrent_process_for, define_sequential_process_for, #enqueue!, #fail!, #no_tasks_defined?, #pause!, #resume!, #start!, #task_failed, #tasks, #to_s
Methods included from Instrumentation
#cancelled_payload, #completed_payload, #enqueued_payload, #failed_payload, #instrument, #paused_payload, #processing_payload, #resumed_payload
Methods included from Taskinator::Persistence
add_process_to_list, deserialize, included, list_key, serialize
Methods included from Workflow
#current_state, #current_state=, #transition
Constructor Details
#initialize(definition, complete_on = CompleteOn::Default, options = {}) ⇒ Concurrent
Returns a new instance of Concurrent.
211 212 213 214 215 |
# File 'lib/taskinator/process.rb', line 211 def initialize(definition, complete_on=CompleteOn::Default, ={}) super(definition, ) @complete_on = complete_on @concurrency_method = .delete(:concurrency_method) || :thread end |
Instance Attribute Details
#complete_on ⇒ Object (readonly)
Returns the value of attribute complete_on.
208 209 210 |
# File 'lib/taskinator/process.rb', line 208 def complete_on @complete_on end |
#concurrency_method ⇒ Object (readonly)
Returns the value of attribute concurrency_method.
209 210 211 |
# File 'lib/taskinator/process.rb', line 209 def concurrency_method @concurrency_method end |
Instance Method Details
#accept(visitor) ⇒ Object
272 273 274 275 |
# File 'lib/taskinator/process.rb', line 272 def accept(visitor) super visitor.visit_attribute_enum(:complete_on, CompleteOn) end |
#enqueue ⇒ Object
217 218 219 220 221 222 223 |
# File 'lib/taskinator/process.rb', line 217 def enqueue if tasks.empty? complete! # weren't any tasks to start with else tasks.each(&:enqueue!) end end |
#inspect ⇒ Object
277 278 279 |
# File 'lib/taskinator/process.rb', line 277 def inspect %(#<#{self.class.name}:0x#{self.__id__.to_s(16)} uuid="#{uuid}", state=:#{current_state}, complete_on=:#{complete_on}, tasks=[#{tasks.inspect}]>) end |
#start ⇒ Object
this method only called in-process (usually from the console)
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/taskinator/process.rb', line 226 def start if tasks.empty? complete! # weren't any tasks to start with else if concurrency_method == :fork tasks.each do |task| fork do task.start! end end Process.waitall else threads = tasks.map do |task| Thread.new do task.start! end end ThreadsWait.all_waits(*threads) end end end |
#task_completed(task) ⇒ Object
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/taskinator/process.rb', line 248 def task_completed(task) # when complete on first, then don't bother with subsequent tasks completing return if completed? || failed? if tasks_completed? # prevent re-entrance so that two tasks completing # simultaneously can't complete the process twice, # which enqueues/starts the same subsequent task Taskinator.redis_mutex(uuid) do # double check, since the status may have # changed while waiting in the mutex complete! if tasks_completed? end end end |
#tasks_completed? ⇒ Boolean
264 265 266 267 268 269 270 |
# File 'lib/taskinator/process.rb', line 264 def tasks_completed? if (complete_on == CompleteOn::First) tasks.any?(&:completed?) else super # all end end |