Class: Conductor::Coordinator
- Inherits:
-
Object
- Object
- Conductor::Coordinator
- Defined in:
- lib/nf-conductor/coordinator/coordinator.rb
Instance Attribute Summary collapse
-
#max_thread_count ⇒ Object
Returns the value of attribute max_thread_count.
-
#polling_timers ⇒ Object
Returns the value of attribute polling_timers.
-
#workers ⇒ Object
Returns the value of attribute workers.
Instance Method Summary collapse
-
#initialize(workers, max_thread_count: 5) ⇒ Coordinator
constructor
Create a new Coordinator for a certain set of Workers.
-
#poll_for_task(worker) ⇒ Object
Executed once every x seconds based on the parent polling_timer.
-
#process_task(worker, task) ⇒ Object
Acknowledges the Task in Conductor, then passes the Task to the Worker to execute.
-
#run(execution_interval = 15) ⇒ Object
Creates and executes a TimerTask for each Worker that the Coordinator has been instantiated with.
-
#stop ⇒ Object
Shuts down all polling_timers for the Coordinator.
- #update_task_with_retry(task_body, count) ⇒ Object
Constructor Details
#initialize(workers, max_thread_count: 5) ⇒ Coordinator
Create a new Coordinator for a certain set of Workers. A Worker is an implementation of the Worker Interface for a specific task. Conductor::Coordinator.new([Conductor::Worker.new(‘matt-1’), Conductor::Worker.new(‘matt-2’)])
11 12 13 14 15 |
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 11 def initialize(workers, max_thread_count: 5) self.workers = workers self.polling_timers = [] self.max_thread_count = max_thread_count end |
Instance Attribute Details
#max_thread_count ⇒ Object
Returns the value of attribute max_thread_count.
6 7 8 |
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 6 def max_thread_count @max_thread_count end |
#polling_timers ⇒ Object
Returns the value of attribute polling_timers.
6 7 8 |
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 6 def polling_timers @polling_timers end |
#workers ⇒ Object
Returns the value of attribute workers.
6 7 8 |
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 6 def workers @workers end |
Instance Method Details
#poll_for_task(worker) ⇒ Object
Executed once every x seconds based on the parent polling_timer. Batch polls the Conductor task queue for the given worker and task type, and executes as many tasks concurrently as possible, using a CachedThreadPool ruby-concurrency.github.io/concurrent-ruby/file.thread_pools.html
43 44 45 46 47 48 49 50 51 52 |
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 43 def poll_for_task(worker) # TODO bulk poll for task, concurrently, up to size of queue tasks = [Conductor::Tasks.poll_task(worker.task_type)] tasks.each do |task| next if task[:status] != 200 process_task(worker, task[:body]) end rescue => e Rails.logger.debug("Conductor::Coordinator : Failed to poll worker (#{worker.task_type}) with error #{e.}") end |
#process_task(worker, task) ⇒ Object
Acknowledges the Task in Conductor, then passes the Task to the Worker to execute. Update the Task in Conductor with status and output data.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 56 def process_task(worker, task) Rails.logger.info("Conductor::Coordinator : Processing task #{task}") if Conductor.config.verbose task_identifiers = { taskId: task[:taskId], workflowInstanceId: task[:workflowInstanceId] } # Acknowledge the task, so other pollers will not be able to see the task in Conductor's queues Conductor::Tasks.acknowledge_task(task[:taskId]) # Execute the task with the implementing application's worker result = worker.execute(task) task_body = result.merge!(task_identifiers) # Update Conductor about the result of the task update_task_with_retry(task_body, 0) rescue => e Rails.logger.debug("Conductor::Coordinator : Failed to process task (#{task}) with error #{e.} at location #{e.backtrace}") update_task_with_retry({ status: 'FAILED' }.merge(task_identifiers), 0) end |
#run(execution_interval = 15) ⇒ Object
Creates and executes a TimerTask for each Worker that the Coordinator has been instantiated with. ruby-concurrency.github.io/concurrent-ruby/Concurrent/TimerTask.html
19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 19 def run(execution_interval=15) self.workers.each do |worker| polling_timer = Concurrent::TimerTask.new(execution_interval: execution_interval) do Rails.logger.info("Conductor::Coordinator : Worker (#{worker.task_type}) polling...") if Conductor.config.verbose poll_for_task(worker) end self.polling_timers << polling_timer polling_timer.execute end end |
#stop ⇒ Object
Shuts down all polling_timers for the Coordinator. Workers will no longer poll for new Tasks.
32 33 34 35 36 37 |
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 32 def stop self.polling_timers.each do |polling_timer| polling_timer.shutdown end self.polling_timers = [] end |
#update_task_with_retry(task_body, count) ⇒ Object
78 79 80 81 82 83 84 85 86 |
# File 'lib/nf-conductor/coordinator/coordinator.rb', line 78 def update_task_with_retry(task_body, count) # Put this in a retryable block instead begin return if count >= 3 Conductor::Tasks.update_task(task_body) rescue update_task_with_retry(task_body, count+1) end end |