Class: Taskinator::Process::Concurrent

Inherits:
Taskinator::Process show all
Defined in:
lib/taskinator/process.rb

Overview


Instance Attribute Summary collapse

Attributes inherited from Taskinator::Process

#created_at, #definition, #options, #parent, #queue, #scope, #updated_at, #uuid

Instance Method Summary collapse

Methods inherited from Taskinator::Process

#<=>, #cancel!, #complete!, define_concurrent_process_for, define_sequential_process_for, #enqueue!, #fail!, #no_tasks_defined?, #pause!, #resume!, #start!, #task_failed, #tasks, #to_s

Methods included from Instrumentation

#cancelled_payload, #completed_payload, #enqueued_payload, #failed_payload, #instrument, #paused_payload, #processing_payload, #resumed_payload

Methods included from Taskinator::Persistence

add_process_to_list, deserialize, included, processes_list_key, serialize

Methods included from Workflow

#current_state, #current_state=, #transition

Constructor Details

#initialize(definition, complete_on = CompleteOn::Default, options = {}) ⇒ Concurrent

Returns a new instance of Concurrent.



227
228
229
230
231
# File 'lib/taskinator/process.rb', line 227

def initialize(definition, complete_on=CompleteOn::Default, options={})
  super(definition, options)
  @complete_on = complete_on
  @concurrency_method = options.delete(:concurrency_method) || :thread
end

Instance Attribute Details

#complete_onObject (readonly)

Returns the value of attribute complete_on.



224
225
226
# File 'lib/taskinator/process.rb', line 224

def complete_on
  @complete_on
end

#concurrency_methodObject (readonly)

Returns the value of attribute concurrency_method.



225
226
227
# File 'lib/taskinator/process.rb', line 225

def concurrency_method
  @concurrency_method
end

Instance Method Details

#accept(visitor) ⇒ Object



289
290
291
292
# File 'lib/taskinator/process.rb', line 289

def accept(visitor)
  super
  visitor.visit_attribute_enum(:complete_on, CompleteOn)
end

#enqueueObject



233
234
235
236
237
238
239
240
241
# File 'lib/taskinator/process.rb', line 233

def enqueue
  if tasks.empty?
    complete! # weren't any tasks to start with
  else
    Taskinator.statsd_client.count("taskinator.#{definition.name.underscore.parameterize}.pending", tasks.count)
    Taskinator.logger.info("Enqueuing #{tasks.count} tasks for process '#{uuid}'.")
    tasks.each(&:enqueue!)
  end
end

#inspectObject



294
295
296
# File 'lib/taskinator/process.rb', line 294

def inspect
  %(#<#{self.class.name}:0x#{self.__id__.to_s(16)} uuid="#{uuid}", state=:#{current_state}, complete_on=:#{complete_on}, tasks=[#{tasks.inspect}]>)
end

#startObject

this method only called in-process (usually from the console)



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/taskinator/process.rb', line 244

def start
  if tasks.empty?
    complete! # weren't any tasks to start with
  else
    if concurrency_method == :fork
      tasks.each do |task|
        fork do
          task.start!
        end
      end
      Process.waitall
    else
      threads = tasks.map do |task|
        Thread.new do
          task.start!
        end
      end
      ThreadsWait.all_waits(*threads)
    end
  end
end

#task_completed(task) ⇒ Object



266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/taskinator/process.rb', line 266

def task_completed(task)
  # deincrement the count of pending concurrent tasks
  pending = deincr_pending_tasks

  Taskinator.statsd_client.count("taskinator.#{definition.name.underscore.parameterize}.pending", pending)
  Taskinator.logger.info("Completed task for process '#{uuid}'. Pending is #{pending}.")

  # when complete on first, then don't bother with subsequent tasks completing
  if (complete_on == CompleteOn::First)
    complete! unless completed?
  else
    complete! if pending < 1
  end
end

#tasks_completed?Boolean

Returns:

  • (Boolean)


281
282
283
284
285
286
287
# File 'lib/taskinator/process.rb', line 281

def tasks_completed?
  if (complete_on == CompleteOn::First)
    tasks.any?(&:completed?)
  else
    super # all
  end
end