Class: Fluent::PluginHelper::ServiceDiscovery::Manager
- Inherits:
-
Object
- Object
- Fluent::PluginHelper::ServiceDiscovery::Manager
- Defined in:
- lib/fluent/plugin_helper/service_discovery/manager.rb
Instance Method Summary collapse
- #configure(configs, parent: nil) ⇒ Object
-
#initialize(log:, load_balancer: nil, custom_build_method: nil) ⇒ Manager
constructor
A new instance of Manager.
- #rebalance ⇒ Object
- #run_once ⇒ Object
- #select_service(&block) ⇒ Object
- #services ⇒ Object
- #start ⇒ Object
- #static_config? ⇒ Boolean
Constructor Details
#initialize(log:, load_balancer: nil, custom_build_method: nil) ⇒ Manager
Returns a new instance of Manager.
24 25 26 27 28 29 30 31 32 33 |
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 24 def initialize(log:, load_balancer: nil, custom_build_method: nil) @log = log @load_balancer = load_balancer || RoundRobinBalancer.new @custom_build_method = custom_build_method @discoveries = [] @services = {} @queue = Queue.new @static_config = true end |
Instance Method Details
#configure(configs, parent: nil) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 35 def configure(configs, parent: nil) configs.each do |config| type, conf = if config.has_key?(:conf) # for compatibility with initial API [config[:type], config[:conf]] else [config['@type'], config] end sd = Fluent::Plugin.new_sd(type, parent: parent) sd.configure(conf) sd.services.each do |s| @services[s.discovery_id] = build_service(s) end @discoveries << sd if @static_config && type.to_sym != :static @static_config = false end end rebalance end |
#rebalance ⇒ Object
103 104 105 |
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 103 def rebalance @load_balancer.rebalance(services) end |
#run_once ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 77 def run_once # Don't care race in this loop intentionally s = @queue.size if s == 0 return end s.times do msg = @queue.pop unless msg.is_a?(Fluent::Plugin::ServiceDiscovery::DiscoveryMessage) @log.warn("BUG: #{msg}") next end begin (msg) rescue => e @log.error(e) end end rebalance end |
#select_service(&block) ⇒ Object
107 108 109 |
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 107 def select_service(&block) @load_balancer.select_service(&block) end |
#services ⇒ Object
111 112 113 |
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 111 def services @services.values end |
#start ⇒ Object
63 64 65 66 67 |
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 63 def start @discoveries.each do |d| d.start(@queue) end end |
#static_config? ⇒ Boolean
59 60 61 |
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 59 def static_config? @static_config end |