Class: Workerholic::WorkerBalancer
- Inherits:
-
Object
- Object
- Workerholic::WorkerBalancer
- Defined in:
- lib/workerholic/worker_balancer.rb
Instance Attribute Summary collapse
-
#alive ⇒ Object
readonly
Returns the value of attribute alive.
-
#auto ⇒ Object
readonly
Returns the value of attribute auto.
-
#queues ⇒ Object
Returns the value of attribute queues.
-
#storage ⇒ Object
readonly
Returns the value of attribute storage.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Instance Method Summary collapse
- #assign_one_worker_per_queue ⇒ Object
- #assign_workers_to_queue(queue, workers_count, total_workers_count) ⇒ Object
- #auto_balance_workers ⇒ Object
- #auto_balanced_workers_distribution ⇒ Object
- #current_workers_count_per_queue ⇒ Object
- #distribute_unassigned_worker(total_workers_count) ⇒ Object
- #evenly_balance_workers ⇒ Object
- #evenly_balanced_workers_distribution ⇒ Object
- #fetch_queues ⇒ Object
-
#initialize(opts = {}) ⇒ WorkerBalancer
constructor
A new instance of WorkerBalancer.
- #io_queues ⇒ Object
- #kill ⇒ Object
- #output_balancer_stats ⇒ Object
- #provision_queues(qs, average_jobs_count_per_worker, total_workers_count) ⇒ Object
- #round(n) ⇒ Object
- #start ⇒ Object
- #total_jobs ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ WorkerBalancer
Returns a new instance of WorkerBalancer.
6 7 8 9 10 11 12 13 |
# File 'lib/workerholic/worker_balancer.rb', line 6 def initialize(opts = {}) @storage = Storage::RedisWrapper.new @queues = fetch_queues @workers = opts[:workers] || [] @alive = true @logger = LogManager.new @auto = opts[:auto_balance] end |
Instance Attribute Details
#alive ⇒ Object (readonly)
Returns the value of attribute alive.
3 4 5 |
# File 'lib/workerholic/worker_balancer.rb', line 3 def alive @alive end |
#auto ⇒ Object (readonly)
Returns the value of attribute auto.
3 4 5 |
# File 'lib/workerholic/worker_balancer.rb', line 3 def auto @auto end |
#queues ⇒ Object
Returns the value of attribute queues.
4 5 6 |
# File 'lib/workerholic/worker_balancer.rb', line 4 def queues @queues end |
#storage ⇒ Object (readonly)
Returns the value of attribute storage.
3 4 5 |
# File 'lib/workerholic/worker_balancer.rb', line 3 def storage @storage end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
3 4 5 |
# File 'lib/workerholic/worker_balancer.rb', line 3 def thread @thread end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
3 4 5 |
# File 'lib/workerholic/worker_balancer.rb', line 3 def workers @workers end |
Instance Method Details
#assign_one_worker_per_queue ⇒ Object
87 88 89 90 91 92 93 94 95 |
# File 'lib/workerholic/worker_balancer.rb', line 87 def assign_one_worker_per_queue index = 0 while index < queues.size && index < workers.size workers[index].queue = queues[index] index += 1 end index end |
#assign_workers_to_queue(queue, workers_count, total_workers_count) ⇒ Object
124 125 126 127 128 |
# File 'lib/workerholic/worker_balancer.rb', line 124 def assign_workers_to_queue(queue, workers_count, total_workers_count) total_workers_count.upto(total_workers_count + workers_count - 1) do |i| workers.to_a[i].queue = Queue.new(queue.name) end end |
#auto_balance_workers ⇒ Object
59 60 61 62 63 64 65 66 67 68 |
# File 'lib/workerholic/worker_balancer.rb', line 59 def auto_balance_workers @thread = Thread.new do while alive auto_balanced_workers_distribution output_balancer_stats sleep 1 end end end |
#auto_balanced_workers_distribution ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/workerholic/worker_balancer.rb', line 70 def auto_balanced_workers_distribution self.queues = fetch_queues total_workers_count = assign_one_worker_per_queue remaining_workers_count = workers.size - total_workers_count average_jobs_count_per_worker = total_jobs / remaining_workers_count.to_f total_workers_count = provision_queues(io_queues, average_jobs_count_per_worker, total_workers_count) distribute_unassigned_worker(total_workers_count) end |
#current_workers_count_per_queue ⇒ Object
158 159 160 161 162 163 164 165 166 |
# File 'lib/workerholic/worker_balancer.rb', line 158 def current_workers_count_per_queue workers.reduce({}) do |result, worker| if worker.queue result[worker.queue.name] = result[worker.queue.name] ? result[worker.queue.name] + 1 : 1 end result end end |
#distribute_unassigned_worker(total_workers_count) ⇒ Object
136 137 138 |
# File 'lib/workerholic/worker_balancer.rb', line 136 def distribute_unassigned_worker(total_workers_count) workers[workers.size - 1].queue = io_queues.find { |q| q.size == io_queues.map(&:size).max } if workers.size - total_workers_count == 1 end |
#evenly_balance_workers ⇒ Object
29 30 31 32 33 34 35 36 37 38 |
# File 'lib/workerholic/worker_balancer.rb', line 29 def evenly_balance_workers @thread = Thread.new do while alive evenly_balanced_workers_distribution output_balancer_stats sleep 1 end end end |
#evenly_balanced_workers_distribution ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/workerholic/worker_balancer.rb', line 40 def evenly_balanced_workers_distribution self.queues = fetch_queues total_workers_count = assign_one_worker_per_queue remaining_workers_count = workers.size - total_workers_count queues.each do |queue| workers_count = remaining_workers_count / queues.size workers_count = round(workers_count) assign_workers_to_queue(queue, workers_count, total_workers_count) total_workers_count += workers_count end distribute_unassigned_worker(total_workers_count) end |
#fetch_queues ⇒ Object
83 84 85 |
# File 'lib/workerholic/worker_balancer.rb', line 83 def fetch_queues storage.fetch_queue_names.map { |queue_name| Queue.new(queue_name) } end |
#io_queues ⇒ Object
101 102 103 104 105 106 107 108 109 |
# File 'lib/workerholic/worker_balancer.rb', line 101 def io_queues io_qs = queues.select { |q| q.name.match(/.*-io$/) } if io_qs.empty? queues else io_qs end end |
#kill ⇒ Object
23 24 25 |
# File 'lib/workerholic/worker_balancer.rb', line 23 def kill thread.kill end |
#output_balancer_stats ⇒ Object
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/workerholic/worker_balancer.rb', line 140 def output_balancer_stats queues_with_size = queues.map { |q| { name: q.name, size: q.size } } queues_with_size.each do |q| output = <<~LOG Queue #{q[:name]}: => #{q[:size]} jobs => #{current_workers_count_per_queue[q[:name]]} workers LOG @logger.info(output) end if queues_with_size.empty? @logger.info("DONE") raise Interrupt end end |
#provision_queues(qs, average_jobs_count_per_worker, total_workers_count) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/workerholic/worker_balancer.rb', line 111 def provision_queues(qs, average_jobs_count_per_worker, total_workers_count) qs.each do |q| workers_count = q.size / average_jobs_count_per_worker workers_count = round(workers_count) assign_workers_to_queue(q, workers_count, total_workers_count) total_workers_count += workers_count end total_workers_count end |
#round(n) ⇒ Object
130 131 132 133 134 |
# File 'lib/workerholic/worker_balancer.rb', line 130 def round(n) return n.floor if n % 1 == 0.5 n.round end |
#start ⇒ Object
15 16 17 18 19 20 21 |
# File 'lib/workerholic/worker_balancer.rb', line 15 def start if auto auto_balance_workers else evenly_balance_workers end end |
#total_jobs ⇒ Object
97 98 99 |
# File 'lib/workerholic/worker_balancer.rb', line 97 def total_jobs io_queues.map(&:size).reduce(:+) || 0 end |