Class: Conductor::Coordinator

Inherits:
Object
  • Object
show all
Defined in:
lib/nf-conductor/coordinator/coordinator.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workers, max_thread_count: 5) ⇒ Coordinator

Create a new Coordinator for a certain set of Workers. A Worker is an implementation of the Worker Interface for a specific task. Conductor::Coordinator.new([Conductor::Worker.new(‘matt-1’), Conductor::Worker.new(‘matt-2’)])



11
12
13
14
15
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 11

def initialize(workers, max_thread_count: 5)
  self.workers = workers
  self.polling_timers = []
  self.max_thread_count = max_thread_count
end

Instance Attribute Details

#max_thread_countObject

Returns the value of attribute max_thread_count.



6
7
8
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 6

def max_thread_count
  @max_thread_count
end

#polling_timersObject

Returns the value of attribute polling_timers.



6
7
8
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 6

def polling_timers
  @polling_timers
end

#workersObject

Returns the value of attribute workers.



6
7
8
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 6

def workers
  @workers
end

Instance Method Details

#poll_for_task(worker) ⇒ Object

Executed once every x seconds based on the parent polling_timer. Batch polls the Conductor task queue for the given worker and task type, and executes as many tasks concurrently as possible, using a CachedThreadPool ruby-concurrency.github.io/concurrent-ruby/file.thread_pools.html



43
44
45
46
47
48
49
50
51
52
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 43

def poll_for_task(worker)
  # TODO bulk poll for task, concurrently, up to size of queue
  tasks = [Conductor::Tasks.poll_task(worker.task_type)]
  tasks.each do |task|
    next if task[:status] != 200
    process_task(worker, task[:body])
  end
rescue => e
  Rails.logger.debug("Conductor::Coordinator : Failed to poll worker (#{worker.task_type}) with error #{e.message}")
end

#process_task(worker, task) ⇒ Object

Acknowledges the Task in Conductor, then passes the Task to the Worker to execute. Update the Task in Conductor with status and output data.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 56

def process_task(worker, task)
  Rails.logger.info("Conductor::Coordinator : Processing task #{task}") if Conductor.config.verbose

  task_identifiers = {
    taskId: task[:taskId],
    workflowInstanceId: task[:workflowInstanceId]
  }

  # Acknowledge the task, so other pollers will not be able to see the task in Conductor's queues
  Conductor::Tasks.acknowledge_task(task[:taskId])

  # Execute the task with the implementing application's worker
  result = worker.execute(task)
  task_body = result.merge!(task_identifiers)

  # Update Conductor about the result of the task
  update_task_with_retry(task_body, 0)
rescue => e
  Rails.logger.debug("Conductor::Coordinator : Failed to process task (#{task}) with error #{e.message} at location #{e.backtrace}")
  update_task_with_retry({ status: 'FAILED' }.merge(task_identifiers), 0)
end

#run(execution_interval = 15) ⇒ Object

Creates and executes a TimerTask for each Worker that the Coordinator has been instantiated with. ruby-concurrency.github.io/concurrent-ruby/Concurrent/TimerTask.html



19
20
21
22
23
24
25
26
27
28
29
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 19

def run(execution_interval=15)
  self.workers.each do |worker|
    polling_timer = Concurrent::TimerTask.new(execution_interval: execution_interval) do
      Rails.logger.info("Conductor::Coordinator : Worker (#{worker.task_type}) polling...") if Conductor.config.verbose
      poll_for_task(worker)
    end

    self.polling_timers << polling_timer
    polling_timer.execute
  end
end

#stopObject

Shuts down all polling_timers for the Coordinator. Workers will no longer poll for new Tasks.



32
33
34
35
36
37
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 32

def stop
  self.polling_timers.each do |polling_timer|
    polling_timer.shutdown
  end
  self.polling_timers = []
end

#update_task_with_retry(task_body, count) ⇒ Object



78
79
80
81
82
83
84
85
86
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 78

def update_task_with_retry(task_body, count)
  # Put this in a retryable block instead
  begin
    return if count >= 3
    Conductor::Tasks.update_task(task_body)
  rescue
    update_task_with_retry(task_body, count+1)
  end
end