Class: Workerholic::WorkerBalancer

Inherits:
Object
  • Object
show all
Defined in:
lib/workerholic/worker_balancer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ WorkerBalancer

Returns a new instance of WorkerBalancer.



6
7
8
9
10
11
12
13
# File 'lib/workerholic/worker_balancer.rb', line 6

def initialize(opts = {})
  @storage = Storage::RedisWrapper.new
  @queues = fetch_queues
  @workers = opts[:workers] || []
  @alive = true
  @logger = LogManager.new
  @auto = opts[:auto_balance]
end

Instance Attribute Details

#aliveObject (readonly)

Returns the value of attribute alive.



3
4
5
# File 'lib/workerholic/worker_balancer.rb', line 3

def alive
  @alive
end

#autoObject (readonly)

Returns the value of attribute auto.



3
4
5
# File 'lib/workerholic/worker_balancer.rb', line 3

def auto
  @auto
end

#queuesObject

Returns the value of attribute queues.



4
5
6
# File 'lib/workerholic/worker_balancer.rb', line 4

def queues
  @queues
end

#storageObject (readonly)

Returns the value of attribute storage.



3
4
5
# File 'lib/workerholic/worker_balancer.rb', line 3

def storage
  @storage
end

#threadObject (readonly)

Returns the value of attribute thread.



3
4
5
# File 'lib/workerholic/worker_balancer.rb', line 3

def thread
  @thread
end

#workersObject (readonly)

Returns the value of attribute workers.



3
4
5
# File 'lib/workerholic/worker_balancer.rb', line 3

def workers
  @workers
end

Instance Method Details

#assign_one_worker_per_queueObject



87
88
89
90
91
92
93
94
95
# File 'lib/workerholic/worker_balancer.rb', line 87

def assign_one_worker_per_queue
  index = 0
  while index < queues.size && index < workers.size
    workers[index].queue = queues[index]
    index += 1
  end

  index
end

#assign_workers_to_queue(queue, workers_count, total_workers_count) ⇒ Object



124
125
126
127
128
# File 'lib/workerholic/worker_balancer.rb', line 124

def assign_workers_to_queue(queue, workers_count, total_workers_count)
  total_workers_count.upto(total_workers_count + workers_count - 1) do |i|
    workers.to_a[i].queue = Queue.new(queue.name)
  end
end

#auto_balance_workersObject



59
60
61
62
63
64
65
66
67
68
# File 'lib/workerholic/worker_balancer.rb', line 59

def auto_balance_workers
  @thread = Thread.new do
    while alive
      auto_balanced_workers_distribution
      output_balancer_stats

      sleep 1
    end
  end
end

#auto_balanced_workers_distributionObject



70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/workerholic/worker_balancer.rb', line 70

def auto_balanced_workers_distribution
  self.queues = fetch_queues

  total_workers_count = assign_one_worker_per_queue

  remaining_workers_count = workers.size - total_workers_count
  average_jobs_count_per_worker = total_jobs / remaining_workers_count.to_f

  total_workers_count = provision_queues(io_queues, average_jobs_count_per_worker, total_workers_count)

  distribute_unassigned_worker(total_workers_count)
end

#current_workers_count_per_queueObject



158
159
160
161
162
163
164
165
166
# File 'lib/workerholic/worker_balancer.rb', line 158

def current_workers_count_per_queue
  workers.reduce({}) do |result, worker|
    if worker.queue
      result[worker.queue.name] = result[worker.queue.name] ? result[worker.queue.name] + 1 : 1
    end

    result
  end
end

#distribute_unassigned_worker(total_workers_count) ⇒ Object



136
137
138
# File 'lib/workerholic/worker_balancer.rb', line 136

def distribute_unassigned_worker(total_workers_count)
  workers[workers.size - 1].queue = io_queues.find { |q| q.size == io_queues.map(&:size).max } if workers.size - total_workers_count == 1
end

#evenly_balance_workersObject



29
30
31
32
33
34
35
36
37
38
# File 'lib/workerholic/worker_balancer.rb', line 29

def evenly_balance_workers
  @thread = Thread.new do
    while alive
      evenly_balanced_workers_distribution
      output_balancer_stats

      sleep 1
    end
  end
end

#evenly_balanced_workers_distributionObject



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/workerholic/worker_balancer.rb', line 40

def evenly_balanced_workers_distribution
  self.queues = fetch_queues

  total_workers_count = assign_one_worker_per_queue

  remaining_workers_count = workers.size - total_workers_count

  queues.each do |queue|
    workers_count = remaining_workers_count / queues.size
    workers_count = round(workers_count)

    assign_workers_to_queue(queue, workers_count, total_workers_count)

    total_workers_count += workers_count
  end

  distribute_unassigned_worker(total_workers_count)
end

#fetch_queuesObject



83
84
85
# File 'lib/workerholic/worker_balancer.rb', line 83

def fetch_queues
  storage.fetch_queue_names.map { |queue_name| Queue.new(queue_name) }
end

#io_queuesObject



101
102
103
104
105
106
107
108
109
# File 'lib/workerholic/worker_balancer.rb', line 101

def io_queues
  io_qs = queues.select { |q| q.name.match(/.*-io$/) }

  if io_qs.empty?
    queues
  else
    io_qs
  end
end

#killObject



23
24
25
# File 'lib/workerholic/worker_balancer.rb', line 23

def kill
  thread.kill
end

#output_balancer_statsObject



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/workerholic/worker_balancer.rb', line 140

def output_balancer_stats
  queues_with_size = queues.map { |q| { name: q.name, size: q.size } }

  queues_with_size.each do |q|
    output = <<~LOG
      Queue #{q[:name]}:
      => #{q[:size]} jobs
      => #{current_workers_count_per_queue[q[:name]]} workers
    LOG
    @logger.info(output)
  end

  if queues_with_size.empty?
    @logger.info("DONE")
    raise Interrupt
  end
end

#provision_queues(qs, average_jobs_count_per_worker, total_workers_count) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/workerholic/worker_balancer.rb', line 111

def provision_queues(qs, average_jobs_count_per_worker, total_workers_count)
  qs.each do |q|
    workers_count = q.size / average_jobs_count_per_worker
    workers_count = round(workers_count)

    assign_workers_to_queue(q, workers_count, total_workers_count)

    total_workers_count += workers_count
  end

  total_workers_count
end

#round(n) ⇒ Object



130
131
132
133
134
# File 'lib/workerholic/worker_balancer.rb', line 130

def round(n)
  return n.floor if n % 1 == 0.5

  n.round
end

#startObject



15
16
17
18
19
20
21
# File 'lib/workerholic/worker_balancer.rb', line 15

def start
  if auto
    auto_balance_workers
  else
    evenly_balance_workers
  end
end

#total_jobsObject



97
98
99
# File 'lib/workerholic/worker_balancer.rb', line 97

def total_jobs
  io_queues.map(&:size).reduce(:+) || 0
end