Class: Taskinator::Process::Concurrent

Inherits:
Taskinator::Process show all
Defined in:
lib/taskinator/process.rb

Overview


Instance Attribute Summary collapse

Attributes inherited from Taskinator::Process

#created_at, #definition, #options, #parent, #queue, #updated_at, #uuid

Instance Method Summary collapse

Methods inherited from Taskinator::Process

#<=>, base_key, #cancel!, #complete!, define_concurrent_process_for, define_sequential_process_for, #enqueue!, #fail!, #no_tasks_defined?, #pause!, #resume!, #start!, #task_failed, #tasks, #to_s

Methods included from Instrumentation

#cancelled_payload, #completed_payload, #enqueued_payload, #failed_payload, #instrument, #paused_payload, #processing_payload, #resumed_payload

Methods included from Taskinator::Persistence

add_process_to_list, deserialize, included, list_key, serialize

Methods included from Workflow

#current_state, #current_state=, #transition

Constructor Details

#initialize(definition, complete_on = CompleteOn::Default, options = {}) ⇒ Concurrent

Returns a new instance of Concurrent.



211
212
213
214
215
# File 'lib/taskinator/process.rb', line 211

def initialize(definition, complete_on=CompleteOn::Default, options={})
  super(definition, options)
  @complete_on = complete_on
  @concurrency_method = options.delete(:concurrency_method) || :thread
end

Instance Attribute Details

#complete_onObject (readonly)

Returns the value of attribute complete_on.



208
209
210
# File 'lib/taskinator/process.rb', line 208

def complete_on
  @complete_on
end

#concurrency_methodObject (readonly)

Returns the value of attribute concurrency_method.



209
210
211
# File 'lib/taskinator/process.rb', line 209

def concurrency_method
  @concurrency_method
end

Instance Method Details

#accept(visitor) ⇒ Object



272
273
274
275
# File 'lib/taskinator/process.rb', line 272

def accept(visitor)
  super
  visitor.visit_attribute_enum(:complete_on, CompleteOn)
end

#enqueueObject



217
218
219
220
221
222
223
# File 'lib/taskinator/process.rb', line 217

def enqueue
  if tasks.empty?
    complete! # weren't any tasks to start with
  else
    tasks.each(&:enqueue!)
  end
end

#inspectObject



277
278
279
# File 'lib/taskinator/process.rb', line 277

def inspect
  %(#<#{self.class.name}:0x#{self.__id__.to_s(16)} uuid="#{uuid}", state=:#{current_state}, complete_on=:#{complete_on}, tasks=[#{tasks.inspect}]>)
end

#startObject

this method only called in-process (usually from the console)



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/taskinator/process.rb', line 226

def start
  if tasks.empty?
    complete! # weren't any tasks to start with
  else
    if concurrency_method == :fork
      tasks.each do |task|
        fork do
          task.start!
        end
      end
      Process.waitall
    else
      threads = tasks.map do |task|
        Thread.new do
          task.start!
        end
      end
      ThreadsWait.all_waits(*threads)
    end
  end
end

#task_completed(task) ⇒ Object



248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/taskinator/process.rb', line 248

def task_completed(task)
  # when complete on first, then don't bother with subsequent tasks completing
  return if completed? || failed?

  if tasks_completed?
    # prevent re-entrance so that two tasks completing
    # simultaneously can't complete the process twice,
    # which enqueues/starts the same subsequent task
    Taskinator.redis_mutex(uuid) do
      # double check, since the status may have
      # changed while waiting in the mutex
      complete! if tasks_completed?
    end
  end
end

#tasks_completed?Boolean

Returns:

  • (Boolean)


264
265
266
267
268
269
270
# File 'lib/taskinator/process.rb', line 264

def tasks_completed?
  if (complete_on == CompleteOn::First)
    tasks.any?(&:completed?)
  else
    super # all
  end
end