Class: Fluent::PluginHelper::ServiceDiscovery::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin_helper/service_discovery/manager.rb

Instance Method Summary collapse

Constructor Details

#initialize(log:, load_balancer: nil, custom_build_method: nil) ⇒ Manager

Returns a new instance of Manager.



24
25
26
27
28
29
30
31
32
33
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 24

def initialize(log:, load_balancer: nil, custom_build_method: nil)
  @log = log
  @load_balancer = load_balancer || RoundRobinBalancer.new
  @custom_build_method = custom_build_method

  @discoveries = []
  @services = {}
  @queue = Queue.new
  @static_config = true
end

Instance Method Details

#configure(opts, parent: nil) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 35

def configure(opts, parent: nil)
  opts.each do |opt|
    sd = Fluent::Plugin.new_sd(opt[:type], parent: parent)
    sd.configure(opt[:conf])

    sd.services.each do |s|
      @services[s.discovery_id] = build_service(s)
    end
    @discoveries << sd

    if @static_config && opt[:type] != :static
      @static_config = false
    end
  end

  rebalance
end

#rebalanceObject



97
98
99
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 97

def rebalance
  @load_balancer.rebalance(services)
end

#run_onceObject



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 71

def run_once
  # Don't care race in this loop intentionally
  s = @queue.size

  if s == 0
    return
  end

  s.times do
    msg = @queue.pop

    unless msg.is_a?(Fluent::Plugin::ServiceDiscovery::DiscoveryMessage)
      @log.warn("BUG: #{msg}")
      next
    end

    begin
      handle_message(msg)
    rescue => e
      @log.error(e)
    end
  end

  rebalance
end

#select_service(&block) ⇒ Object



101
102
103
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 101

def select_service(&block)
  @load_balancer.select_service(&block)
end

#servicesObject



105
106
107
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 105

def services
  @services.values
end

#startObject



57
58
59
60
61
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 57

def start
  @discoveries.each do |d|
    d.start(@queue)
  end
end

#static_config?Boolean

Returns:

  • (Boolean)


53
54
55
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 53

def static_config?
  @static_config
end