Class: RocketJob::WorkerPool
- Inherits:
-
Object
- Object
- RocketJob::WorkerPool
- Includes:
- SemanticLogger::Loggable
- Defined in:
- lib/rocket_job/worker_pool.rb
Instance Attribute Summary collapse
-
#server_name ⇒ Object
readonly
Returns the value of attribute server_name.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Instance Method Summary collapse
-
#find(id) ⇒ Object
Find a worker in the list by its id.
-
#initialize(server_name) ⇒ WorkerPool
constructor
A new instance of WorkerPool.
-
#join(timeout = 5) ⇒ Object
Wait for all workers to stop.
-
#kill ⇒ Object
Kill Worker threads.
-
#living_count ⇒ Object
Returns [Integer] number of workers (threads) that are alive.
- #log_backtraces ⇒ Object
-
#prune ⇒ Object
Returns [Integer] number of dead workers removed.
-
#rebalance(max_workers, stagger_start = false) ⇒ Object
Add new workers to get back to the ‘max_workers` if not already at `max_workers` Parameters stagger_start Whether to stagger when the workers poll for work the first time.
-
#stop ⇒ Object
Tell all workers to stop working.
Constructor Details
#initialize(server_name) ⇒ WorkerPool
Returns a new instance of WorkerPool.
10 11 12 13 14 |
# File 'lib/rocket_job/worker_pool.rb', line 10 def initialize(server_name) @server_name = server_name @workers = Concurrent::Array.new @worker_id = 0 end |
Instance Attribute Details
#server_name ⇒ Object (readonly)
Returns the value of attribute server_name.
8 9 10 |
# File 'lib/rocket_job/worker_pool.rb', line 8 def server_name @server_name end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
8 9 10 |
# File 'lib/rocket_job/worker_pool.rb', line 8 def workers @workers end |
Instance Method Details
#find(id) ⇒ Object
Find a worker in the list by its id
17 18 19 |
# File 'lib/rocket_job/worker_pool.rb', line 17 def find(id) workers.find { |worker| worker.id == id } end |
#join(timeout = 5) ⇒ Object
Wait for all workers to stop. Return [true] if all workers stopped Return [false] on timeout
70 71 72 73 74 75 76 77 78 |
# File 'lib/rocket_job/worker_pool.rb', line 70 def join(timeout = 5) while (worker = workers.first) return false unless worker.join(timeout) # Worker thread is dead workers.shift end true end |
#kill ⇒ Object
Kill Worker threads
62 63 64 65 |
# File 'lib/rocket_job/worker_pool.rb', line 62 def kill workers.each(&:kill) workers.clear end |
#living_count ⇒ Object
Returns [Integer] number of workers (threads) that are alive
81 82 83 |
# File 'lib/rocket_job/worker_pool.rb', line 81 def living_count workers.count(&:alive?) end |
#log_backtraces ⇒ Object
85 86 87 |
# File 'lib/rocket_job/worker_pool.rb', line 85 def log_backtraces workers.each { |worker| logger.backtrace(thread: worker.thread) if worker.thread && worker.alive? } end |
#prune ⇒ Object
Returns [Integer] number of dead workers removed.
47 48 49 50 51 52 53 54 |
# File 'lib/rocket_job/worker_pool.rb', line 47 def prune remove_count = workers.count - living_count return 0 if remove_count.zero? logger.info "Cleaned up #{remove_count} dead workers" workers.delete_if { |t| !t.alive? } remove_count end |
#rebalance(max_workers, stagger_start = false) ⇒ Object
Add new workers to get back to the ‘max_workers` if not already at `max_workers`
Parameters
stagger_start
Whether to stagger when the workers poll for work the first time.
It spreads out the queue polling over the max_poll_seconds so
that not all workers poll at the same time.
The worker also responds faster than max_poll_seconds when a new job is created.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/rocket_job/worker_pool.rb', line 28 def rebalance(max_workers, stagger_start = false) count = max_workers.to_i - living_count return 0 unless count.positive? logger.info("#{'Stagger ' if stagger_start}Starting #{count} workers") add_one count -= 1 delay = Config.max_poll_seconds.to_f / max_workers count.times.each do sleep(delay) if stagger_start return -1 if Supervisor.shutdown? add_one end end |
#stop ⇒ Object
Tell all workers to stop working.
57 58 59 |
# File 'lib/rocket_job/worker_pool.rb', line 57 def stop workers.each(&:shutdown!) end |