Class: DruidConfig::ZK
- Inherits:
-
Object
- Object
- DruidConfig::ZK
- Defined in:
- lib/druid_config/zk.rb
Overview
Class to connect and get information about nodes in cluster using Zookeeper
Constant Summary collapse
- COORDINATOR =
Coordinator service
'coordinator'
- OVERLORD =
'overlord'
- SERVICES =
[COORDINATOR, OVERLORD]
Instance Method Summary collapse
-
#check_service(service) ⇒ Object
Check a given Druid service.
-
#check_services ⇒ Object
Check current services.
-
#close! ⇒ Object
Force to close Zookeper connection.
-
#coordinator ⇒ Object
Return the URI of a random available coordinator.
-
#initialize(uri, opts = {}) ⇒ ZK
constructor
Initialize variables and call register.
-
#overlord ⇒ Object
Return the URI of a random available overlord.
-
#random_node(service) ⇒ Object
Return a random value of a service.
-
#register ⇒ Object
Load the data from Zookeeper.
-
#register_service(service, brokers) ⇒ Object
Register a new service.
-
#services ⇒ Object
Get all available services.
- #to_s ⇒ Object
-
#unregister_service(service) ⇒ Object
Unregister a service.
-
#unwatch_service(service) ⇒ Object
Unset a service to watch.
-
#verify_node(name, service) ⇒ Object
Verify is a Coordinator is available.
-
#watch_path(service) ⇒ Object
Return the path of a service in Zookeeper.
-
#watch_service(service) ⇒ Object
Set a watcher for a service.
Constructor Details
#initialize(uri, opts = {}) ⇒ ZK
Initialize variables and call register
Parameters:
- uri
-
Uri of zookeper
- opts
-
Hash with options:
- discovery_path: Custom URL of discovery path for Druid
31 32 33 34 35 36 37 38 39 |
# File 'lib/druid_config/zk.rb', line 31 def initialize(uri, opts = {}) # Control Zookeper connection @zk = ::ZK.new(uri, chroot: :check) @registry = Hash.new { |hash, key| hash[key] = [] } @discovery_path = opts[:discovery_path] || '/discovery' @watched_services = {} @verify_retry = 0 register end |
Instance Method Details
#check_service(service) ⇒ Object
Check a given Druid service. Now we only need to track coordinator and overlord services. This method create a watcher to the service to check changes.
This method get the available nodes in the Zookeeper path. When return them, it tries to connect to /status end point to check if the node is available. After it, it store in @registry.
Parameters:
- service
-
String with the service to check
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/druid_config/zk.rb', line 209 def check_service(service) # Only watch some services return if @watched_services.include?(service) || !SERVICES.include?(service) # Start to watch this service watch_service(service) # New list of nodes new_list = [] # Verify every node live = @zk.children(watch_path(service), watch: true) live.each do |name| # Verify a node uri = verify_node(name, service) # If != false store the URI new_list.push(name: name, uri: uri) if uri end # Register new service in the registry register_service(service, new_list) end |
#check_services ⇒ Object
Check current services
135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/druid_config/zk.rb', line 135 def check_services $log.info("druid.zk checking services") if $log zk_services = @zk.children(@discovery_path, watch: true) (services - zk_services).each do |service| unregister_service(service) end zk_services.each do |service| check_service(service) end end |
#close! ⇒ Object
Force to close Zookeper connection
57 58 59 60 |
# File 'lib/druid_config/zk.rb', line 57 def close! $log.info('druid.zk shutting down') if $log @zk.close! end |
#coordinator ⇒ Object
Return the URI of a random available coordinator. Poor mans load balancing
66 67 68 |
# File 'lib/druid_config/zk.rb', line 66 def coordinator random_node(COORDINATOR) end |
#overlord ⇒ Object
Return the URI of a random available overlord. Poor mans load balancing
74 75 76 |
# File 'lib/druid_config/zk.rb', line 74 def overlord random_node(OVERLORD) end |
#random_node(service) ⇒ Object
Return a random value of a service
Parameters:
- service
-
String with the name of the service
85 86 87 88 89 |
# File 'lib/druid_config/zk.rb', line 85 def random_node(service) return nil if @registry[service].size == 0 # Return a random broker from available brokers @registry[service].sample[:uri] end |
#register ⇒ Object
Load the data from Zookeeper
44 45 46 47 48 49 50 51 52 |
# File 'lib/druid_config/zk.rb', line 44 def register $log.info('druid.zk register discovery path') if $log @zk.on_expired_session { register } @zk.register(@discovery_path, only: :child) do $log.info('druid.zk got event on discovery path') if $log check_services end check_services end |
#register_service(service, brokers) ⇒ Object
Register a new service
94 95 96 97 98 |
# File 'lib/druid_config/zk.rb', line 94 def register_service(service, brokers) $log.info("druid.zk register", service: service, brokers: brokers) if $log # poor mans load balancing @registry[service] = brokers end |
#services ⇒ Object
Get all available services
233 234 235 |
# File 'lib/druid_config/zk.rb', line 233 def services @registry.keys end |
#to_s ⇒ Object
237 238 239 |
# File 'lib/druid_config/zk.rb', line 237 def to_s @registry.to_s end |
#unregister_service(service) ⇒ Object
Unregister a service
103 104 105 106 107 |
# File 'lib/druid_config/zk.rb', line 103 def unregister_service(service) $log.info("druid.zk unregister", service: service) if $log @registry.delete(service) unwatch_service(service) end |
#unwatch_service(service) ⇒ Object
Unset a service to watch
126 127 128 129 130 |
# File 'lib/druid_config/zk.rb', line 126 def unwatch_service(service) return unless @watched_services.include?(service) $log.info("druid.zk unwatch", service: service) if $log @watched_services.delete(service).unregister end |
#verify_node(name, service) ⇒ Object
Verify is a Coordinator is available. To do check, this method perform a GET request to the /status end point. This method will retry to connect three times with a delay of 0.8, 1.6, 2.4 seconds.
Parameters:
- name
-
String with the name of the coordinator
- service
-
String with the service
Returns:
URI of the coordinator or false
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/druid_config/zk.rb', line 162 def verify_node(name, service) $log.info("druid.zk verify", node: name, service: service) if $log info = @zk.get("#{watch_path(service)}/#{name}") node = JSON.parse(info[0]) uri = "http://#{node['address']}:#{node['port']}/" # Try to get /status check = RestClient::Request.execute( method: :get, url: "#{uri}status", timeout: 5, open_timeout: 5 ) $log.info("druid.zk verified", uri: uri, sources: check) if $log return uri if check.code == 200 rescue return false unless @verify_retry < 3 # Sleep some time and retry @verify_retry += 1 sleep 0.8 * @verify_retry retry ensure # Reset verify retries @verify_retry = 0 end |
#watch_path(service) ⇒ Object
Return the path of a service in Zookeeper.
Parameters:
- service
-
String with the name of the service
192 193 194 |
# File 'lib/druid_config/zk.rb', line 192 def watch_path(service) "#{@discovery_path}/#{service}" end |
#watch_service(service) ⇒ Object
Set a watcher for a service
112 113 114 115 116 117 118 119 120 121 |
# File 'lib/druid_config/zk.rb', line 112 def watch_service(service) return if @watched_services.include?(service) $log.info("druid.zk watch", service: service) if $log watch = @zk.register(watch_path(service), only: :child) do |event| $log.info("druid.zk got event on watch path for", service: service, event: event) if $log unwatch_service(service) check_service(service) end @watched_services[service] = watch end |