Class: Taskinator::Process::Concurrent

Inherits:
Taskinator::Process show all
Defined in:
lib/taskinator/process.rb

Overview


Instance Attribute Summary collapse

Attributes inherited from Taskinator::Process

#created_at, #definition, #options, #parent, #queue, #scope, #updated_at, #uuid

Instance Method Summary collapse

Methods inherited from Taskinator::Process

#<=>, #cancel!, #complete!, define_concurrent_process_for, define_sequential_process_for, #enqueue!, #fail!, #no_tasks_defined?, #pause!, #resume!, #start!, #task_failed, #tasks, #to_s

Methods included from Instrumentation

#cancelled_payload, #completed_payload, #enqueued_payload, #failed_payload, #instrument, #paused_payload, #processing_payload, #resumed_payload

Methods included from Taskinator::Persistence

add_process_to_list, deserialize, included, processes_list_key, serialize

Methods included from Workflow

#current_state, #current_state=, #transition

Constructor Details

#initialize(definition, complete_on = CompleteOn::Default, options = {}) ⇒ Concurrent

Returns a new instance of Concurrent.



234
235
236
237
238
239
# File 'lib/taskinator/process.rb', line 234

def initialize(definition, complete_on=CompleteOn::Default, options={})
  super(definition, options)
  @complete_on = complete_on
  @concurrency_method = options.delete(:concurrency_method) || :thread
  warn("[DEPRECATED]: concurrency_method will be removed in a future version.") if @concurrency_method == :fork
end

Instance Attribute Details

#complete_onObject (readonly)

Returns the value of attribute complete_on.



229
230
231
# File 'lib/taskinator/process.rb', line 229

def complete_on
  @complete_on
end

#concurrency_methodObject (readonly)

DEPRECATED: concurrency_method will be removed in a future version.



232
233
234
# File 'lib/taskinator/process.rb', line 232

def concurrency_method
  @concurrency_method
end

Instance Method Details

#accept(visitor) ⇒ Object



298
299
300
301
# File 'lib/taskinator/process.rb', line 298

def accept(visitor)
  super
  visitor.visit_attribute_enum(:complete_on, CompleteOn)
end

#enqueueObject



241
242
243
244
245
246
247
248
# File 'lib/taskinator/process.rb', line 241

def enqueue
  if tasks.empty?
    complete! # weren't any tasks to start with
  else
    Taskinator.logger.info("Enqueuing #{tasks.count} tasks for process '#{uuid}'.")
    tasks.each(&:enqueue!)
  end
end

#inspectObject



303
304
305
# File 'lib/taskinator/process.rb', line 303

def inspect
  %(#<#{self.class.name}:0x#{self.__id__.to_s(16)} uuid="#{uuid}", state=:#{current_state}, complete_on=:#{complete_on}, tasks=[#{tasks.inspect}]>)
end

#startObject

this method only called in-process (usually from the console)



251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/taskinator/process.rb', line 251

def start
  if tasks.empty?
    complete! # weren't any tasks to start with
  else
    if concurrency_method == :fork
      # :nocov:
      warn("[DEPRECATED]: concurrency_method will be removed in a future version.")
      tasks.each do |task|
        fork do
          task.start!
        end
      end
      Process.waitall
      # :nocov:
    else
      threads = tasks.map do |task|
        Thread.new do
          task.start!
        end
      end
      ThreadsWait.all_waits(*threads)
    end
  end
end

#task_completed(task) ⇒ Object



276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/taskinator/process.rb', line 276

def task_completed(task)
  # decrement the count of pending concurrent tasks
  pending = deincr_pending_tasks

  Taskinator.logger.info("Completed task for process '#{uuid}'. Pending is #{pending}.")

  # when complete on first, then don't bother with subsequent tasks completing
  if complete_on == CompleteOn::First
    complete! unless completed?
  else
    complete! if pending < 1
  end
end

#tasks_completed?Boolean

Returns:

  • (Boolean)


290
291
292
293
294
295
296
# File 'lib/taskinator/process.rb', line 290

def tasks_completed?
  if complete_on == CompleteOn::First
    tasks.any?(&:completed?)
  else
    super # all
  end
end