Class: Fluent::PluginHelper::ServiceDiscovery::Manager
- Inherits:
-
Object
- Object
- Fluent::PluginHelper::ServiceDiscovery::Manager
- Defined in:
- lib/fluent/plugin_helper/service_discovery/manager.rb
Instance Method Summary collapse
- #configure(opts, parent: nil) ⇒ Object
-
#initialize(log:, load_balancer: nil, custom_build_method: nil) ⇒ Manager
constructor
A new instance of Manager.
- #rebalance ⇒ Object
- #run_once ⇒ Object
- #select_service(&block) ⇒ Object
- #services ⇒ Object
- #start ⇒ Object
- #static_config? ⇒ Boolean
Constructor Details
#initialize(log:, load_balancer: nil, custom_build_method: nil) ⇒ Manager
Returns a new instance of Manager.
24 25 26 27 28 29 30 31 32 33 |
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 24 def initialize(log:, load_balancer: nil, custom_build_method: nil) @log = log @load_balancer = load_balancer || RoundRobinBalancer.new @custom_build_method = custom_build_method @discoveries = [] @services = {} @queue = Queue.new @static_config = true end |
Instance Method Details
#configure(opts, parent: nil) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 35 def configure(opts, parent: nil) opts.each do |opt| sd = Fluent::Plugin.new_sd(opt[:type], parent: parent) sd.configure(opt[:conf]) sd.services.each do |s| @services[s.discovery_id] = build_service(s) end @discoveries << sd if @static_config && opt[:type] != :static @static_config = false end end rebalance end |
#rebalance ⇒ Object
97 98 99 |
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 97 def rebalance @load_balancer.rebalance(services) end |
#run_once ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 71 def run_once # Don't care race in this loop intentionally s = @queue.size if s == 0 return end s.times do msg = @queue.pop unless msg.is_a?(Fluent::Plugin::ServiceDiscovery::DiscoveryMessage) @log.warn("BUG: #{msg}") next end begin (msg) rescue => e @log.error(e) end end rebalance end |
#select_service(&block) ⇒ Object
101 102 103 |
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 101 def select_service(&block) @load_balancer.select_service(&block) end |
#services ⇒ Object
105 106 107 |
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 105 def services @services.values end |
#start ⇒ Object
57 58 59 60 61 |
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 57 def start @discoveries.each do |d| d.start(@queue) end end |
#static_config? ⇒ Boolean
53 54 55 |
# File 'lib/fluent/plugin_helper/service_discovery/manager.rb', line 53 def static_config? @static_config end |