Class: Fluent::Plugin::ForwardOutput::LoadBalancer
- Inherits:
-
Object
- Object
- Fluent::Plugin::ForwardOutput::LoadBalancer
- Defined in:
- lib/fluent/plugin/out_forward/load_balancer.rb
Instance Method Summary collapse
-
#initialize(log) ⇒ LoadBalancer
constructor
A new instance of LoadBalancer.
- #rebuild_weight_array(nodes) ⇒ Object (also: #rebalance)
- #select_healthy_node ⇒ Object (also: #select_service)
Constructor Details
#initialize(log) ⇒ LoadBalancer
Returns a new instance of LoadBalancer.
23 24 25 26 27 28 29 |
# File 'lib/fluent/plugin/out_forward/load_balancer.rb', line 23 def initialize(log) @log = log @weight_array = [] @rand_seed = Random.new.seed @rr = 0 @mutex = Mutex.new end |
Instance Method Details
#rebuild_weight_array(nodes) ⇒ Object Also known as: rebalance
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/fluent/plugin/out_forward/load_balancer.rb', line 58 def rebuild_weight_array(nodes) standby_nodes, regular_nodes = nodes.select { |e| e.weight > 0 }.partition {|n| n.standby? } lost_weight = 0 regular_nodes.each {|n| unless n.available? lost_weight += n.weight end } @log.debug("rebuilding weight array", lost_weight: lost_weight) if lost_weight > 0 standby_nodes.each {|n| if n.available? regular_nodes << n @log.warn "using standby node #{n.host}:#{n.port}", weight: n.weight lost_weight -= n.weight break if lost_weight <= 0 end } end weight_array = [] if regular_nodes.empty? @log.warn('No nodes are available') @mutex.synchronize do @weight_array = weight_array end return @weight_array end gcd = regular_nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) } regular_nodes.each {|n| (n.weight / gcd).times { weight_array << n } } # for load balancing during detecting crashed servers coe = (regular_nodes.size * 6) / weight_array.size weight_array *= coe if coe > 1 r = Random.new(@rand_seed) weight_array.sort_by! { r.rand } @mutex.synchronize do @weight_array = weight_array end end |
#select_healthy_node ⇒ Object Also known as: select_service
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/fluent/plugin/out_forward/load_balancer.rb', line 31 def select_healthy_node error = nil # Don't care about the change of @weight_array's size while looping since # it's only used for determining the number of loops and it is not so important. wlen = @weight_array.size wlen.times do node = @mutex.synchronize do r = @rr % @weight_array.size @rr = (r + 1) % @weight_array.size @weight_array[r] end next unless node.available? begin ret = yield node return ret, node rescue # for load balancing during detecting crashed servers error = $! # use the latest error end end raise error if error raise NoNodesAvailable, "no nodes are available" end |