Class: CobraCommander::Executor::WorkerPool
- Inherits:
-
Object
- Object
- CobraCommander::Executor::WorkerPool
- Defined in:
- lib/cobra_commander/executor/worker_pool.rb
Overview
This WorkerPooll will queue up jobs, and execute them using Worker’s, each with a thread running our work loop using the given runner.
-
A job is defined by a group of arguments to be passed to the runner.
-
A runner is an object that respond to #call(tty, *args), where TTY is an instance of TTY::Command, and *args are the arguments queued in the worker pool.
-
The runner call method must return an array of [status, output]
-
A worker manages a thread running the job wirh runner.call and updates the job result and output.
Defined Under Namespace
Instance Attribute Summary collapse
-
#jobs ⇒ Object
readonly
Returns the value of attribute jobs.
Instance Method Summary collapse
- #error? ⇒ Boolean
-
#initialize(runner:, workers:, printer:, jobs:, &name_f) ⇒ WorkerPool
constructor
A new instance of WorkerPool.
- #push(name, *args) ⇒ Object
- #push_all(jobs, &name_f) ⇒ Object
- #run_next ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize(runner:, workers:, printer:, jobs:, &name_f) ⇒ WorkerPool
Returns a new instance of WorkerPool.
58 59 60 61 62 63 64 65 66 |
# File 'lib/cobra_commander/executor/worker_pool.rb', line 58 def initialize(runner:, workers:, printer:, jobs:, &name_f) @tty = ::CobraCommander::Executor::IsolatedPTY.new(printer: printer) @runner = runner @workers = Array.new(workers) { |id| Worker.new(id, self) } @jobs = [] @queue = Queue.new push_all(jobs, &(name_f || :describe)) end |
Instance Attribute Details
#jobs ⇒ Object (readonly)
Returns the value of attribute jobs.
56 57 58 |
# File 'lib/cobra_commander/executor/worker_pool.rb', line 56 def jobs @jobs end |
Instance Method Details
#error? ⇒ Boolean
68 69 70 |
# File 'lib/cobra_commander/executor/worker_pool.rb', line 68 def error? jobs.map(&:status).any?(:error) end |
#push(name, *args) ⇒ Object
76 77 78 79 80 81 |
# File 'lib/cobra_commander/executor/worker_pool.rb', line 76 def push(name, *args) Job.new(name, args).tap do |job| @jobs.push(job) @queue.push(job) end end |
#push_all(jobs, &name_f) ⇒ Object
72 73 74 |
# File 'lib/cobra_commander/executor/worker_pool.rb', line 72 def push_all(jobs, &name_f) jobs.each { push(name_f&.(_1), *Array(_1)) } end |
#run_next ⇒ Object
96 97 98 99 100 101 |
# File 'lib/cobra_commander/executor/worker_pool.rb', line 96 def run_next return :exit if @stop return :exit unless (job = @queue.pop) job.resolve!(*@runner.call(@tty, *job.args)) end |
#start ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/cobra_commander/executor/worker_pool.rb', line 83 def start @stop = false @workers.each(&:spawn) @queue.close begin @workers.each(&:wait) rescue Interrupt @workers.each(&:kill) if @stop @stop = true retry end end |