Class: Taskinator::Process::Concurrent
- Inherits:
-
Taskinator::Process
- Object
- Taskinator::Process
- Taskinator::Process::Concurrent
- Defined in:
- lib/taskinator/process.rb
Overview
Instance Attribute Summary collapse
-
#complete_on ⇒ Object
readonly
Returns the value of attribute complete_on.
-
#concurrency_method ⇒ Object
readonly
DEPRECATED: concurrency_method will be removed in a future version.
Attributes inherited from Taskinator::Process
#created_at, #definition, #options, #parent, #queue, #scope, #updated_at, #uuid
Instance Method Summary collapse
- #accept(visitor) ⇒ Object
- #enqueue ⇒ Object
-
#initialize(definition, complete_on = CompleteOn::Default, options = {}) ⇒ Concurrent
constructor
A new instance of Concurrent.
- #inspect ⇒ Object
-
#start ⇒ Object
this method only called in-process (usually from the console).
- #task_completed(task) ⇒ Object
- #tasks_completed? ⇒ Boolean
Methods inherited from Taskinator::Process
#<=>, #cancel!, #complete!, define_concurrent_process_for, define_sequential_process_for, #enqueue!, #fail!, #no_tasks_defined?, #pause!, #resume!, #start!, #task_failed, #tasks, #to_s
Methods included from Instrumentation
#cancelled_payload, #completed_payload, #enqueued_payload, #failed_payload, #instrument, #paused_payload, #processing_payload, #resumed_payload
Methods included from Taskinator::Persistence
add_process_to_list, deserialize, included, processes_list_key, serialize
Methods included from Workflow
#current_state, #current_state=, #transition
Constructor Details
#initialize(definition, complete_on = CompleteOn::Default, options = {}) ⇒ Concurrent
Returns a new instance of Concurrent.
234 235 236 237 238 239 |
# File 'lib/taskinator/process.rb', line 234 def initialize(definition, complete_on=CompleteOn::Default, ={}) super(definition, ) @complete_on = complete_on @concurrency_method = .delete(:concurrency_method) || :thread warn("[DEPRECATED]: concurrency_method will be removed in a future version.") if @concurrency_method == :fork end |
Instance Attribute Details
#complete_on ⇒ Object (readonly)
Returns the value of attribute complete_on.
229 230 231 |
# File 'lib/taskinator/process.rb', line 229 def complete_on @complete_on end |
#concurrency_method ⇒ Object (readonly)
DEPRECATED: concurrency_method will be removed in a future version.
232 233 234 |
# File 'lib/taskinator/process.rb', line 232 def concurrency_method @concurrency_method end |
Instance Method Details
#accept(visitor) ⇒ Object
298 299 300 301 |
# File 'lib/taskinator/process.rb', line 298 def accept(visitor) super visitor.visit_attribute_enum(:complete_on, CompleteOn) end |
#enqueue ⇒ Object
241 242 243 244 245 246 247 248 |
# File 'lib/taskinator/process.rb', line 241 def enqueue if tasks.empty? complete! # weren't any tasks to start with else Taskinator.logger.info("Enqueuing #{tasks.count} tasks for process '#{uuid}'.") tasks.each(&:enqueue!) end end |
#inspect ⇒ Object
303 304 305 |
# File 'lib/taskinator/process.rb', line 303 def inspect %(#<#{self.class.name}:0x#{self.__id__.to_s(16)} uuid="#{uuid}", state=:#{current_state}, complete_on=:#{complete_on}, tasks=[#{tasks.inspect}]>) end |
#start ⇒ Object
this method only called in-process (usually from the console)
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/taskinator/process.rb', line 251 def start if tasks.empty? complete! # weren't any tasks to start with else if concurrency_method == :fork # :nocov: warn("[DEPRECATED]: concurrency_method will be removed in a future version.") tasks.each do |task| fork do task.start! end end Process.waitall # :nocov: else threads = tasks.map do |task| Thread.new do task.start! end end ThreadsWait.all_waits(*threads) end end end |
#task_completed(task) ⇒ Object
276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/taskinator/process.rb', line 276 def task_completed(task) # decrement the count of pending concurrent tasks pending = deincr_pending_tasks Taskinator.logger.info("Completed task for process '#{uuid}'. Pending is #{pending}.") # when complete on first, then don't bother with subsequent tasks completing if complete_on == CompleteOn::First complete! unless completed? else complete! if pending < 1 end end |
#tasks_completed? ⇒ Boolean
290 291 292 293 294 295 296 |
# File 'lib/taskinator/process.rb', line 290 def tasks_completed? if complete_on == CompleteOn::First tasks.any?(&:completed?) else super # all end end |