Class: Fluent::PluginHelper::ServiceDiscovery::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin_helper/service_discovery/manager.rb

Instance Method Summary collapse

Constructor Details

#initialize(log:, load_balancer: nil, custom_build_method: nil) ⇒ Manager

Returns a new instance of Manager.



24
25
26
27
28
29
30
31
32
33
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 24

def initialize(log:, load_balancer: nil, custom_build_method: nil)
  @log = log
  @load_balancer = load_balancer || RoundRobinBalancer.new
  @custom_build_method = custom_build_method

  @discoveries = []
  @services = {}
  @queue = Queue.new
  @static_config = true
end

Instance Method Details

#configure(configs, parent: nil) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 35

def configure(configs, parent: nil)
  configs.each do |config|
    type, conf = if config.has_key?(:conf) # for compatibility with initial API
                   [config[:type], config[:conf]]
                 else
                   [config['@type'], config]
                 end

    sd = Fluent::Plugin.new_sd(type, parent: parent)
    sd.configure(conf)

    sd.services.each do |s|
      @services[s.discovery_id] = build_service(s)
    end
    @discoveries << sd

    if @static_config && type.to_sym != :static
      @static_config = false
    end
  end

  rebalance
end

#rebalanceObject



103
104
105
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 103

def rebalance
  @load_balancer.rebalance(services)
end

#run_onceObject



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 77

def run_once
  # Don't care race in this loop intentionally
  s = @queue.size

  if s == 0
    return
  end

  s.times do
    msg = @queue.pop

    unless msg.is_a?(Fluent::Plugin::ServiceDiscovery::DiscoveryMessage)
      @log.warn("BUG: #{msg}")
      next
    end

    begin
      handle_message(msg)
    rescue => e
      @log.error(e)
    end
  end

  rebalance
end

#select_service(&block) ⇒ Object



107
108
109
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 107

def select_service(&block)
  @load_balancer.select_service(&block)
end

#servicesObject



111
112
113
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 111

def services
  @services.values
end

#startObject



63
64
65
66
67
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 63

def start
  @discoveries.each do |d|
    d.start(@queue)
  end
end

#static_config?Boolean

Returns:

  • (Boolean)


59
60
61
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 59

def static_config?
  @static_config
end