Class: DruidConfig::Cluster
- Inherits:
-
Object
- Object
- DruidConfig::Cluster
- Includes:
- Util, HTTParty
- Defined in:
- lib/druid_config/cluster.rb
Overview
Class to initialize the connection to Zookeeper
Instance Method Summary collapse
-
#close! ⇒ Object
Close connection with zookeeper.
-
#complete_tasks ⇒ Object
Return complete tasks.
-
#datasource(datasource) ⇒ Object
Return a unique datasource.
-
#datasources ⇒ Object
Return all datasources.
-
#default_datasource ⇒ Object
Return default datasource.
-
#failed_tasks ⇒ Object
Return failed completed tasks.
-
#historicals ⇒ Object
Returns only historial nodes.
-
#initialize(zk_uri, options) ⇒ Cluster
constructor
Initialize the client to perform the queries.
-
#leader ⇒ Object
Return the leader of the Druid cluster.
-
#load_queue(params = '') ⇒ Object
Load queue of the cluster.
-
#load_status(params = '') ⇒ Object
Load status of the cluster.
-
#metadata_datasources(params = '') ⇒ Object
(also: #mt_datasources)
Return a Hash with metadata of datasources.
-
#metadata_datasources_segments(data_source, segment = '') ⇒ Object
(also: #mt_datasources_segments)
Return a Hash with metadata of segments.
-
#physical_servers ⇒ Object
(also: #physical_nodes)
URIs of the physical servers in the cluster.
-
#physical_workers ⇒ Object
URIs of the physical workers in the cluster.
-
#realtimes ⇒ Object
Returns only realtime.
-
#reset! ⇒ Object
Reset the client.
-
#rules ⇒ Object
Return the rules applied to a cluster.
-
#servers ⇒ Object
(also: #nodes)
Return all nodes of the cluster.
-
#services ⇒ Object
Availabe services in the cluster.
-
#success_tasks ⇒ Object
Return success completed tasks.
-
#task(id) ⇒ Object
Find a task.
-
#tiers ⇒ Object
Return all tiers defined in the cluster.
-
#workers ⇒ Object
Return all Workers (MiddleManager) of the cluster.
Methods included from Util
#pop_uri, #query_overlord, #secure_query, #stash_uri
Constructor Details
#initialize(zk_uri, options) ⇒ Cluster
Initialize the client to perform the queries
Parameters:
- zk_uri
-
String with URI or URIs (sparated by comma) of Zookeeper
- options
-
Hash with options:
- discovery_path: String with the discovery path of Druid
20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/druid_config/cluster.rb', line 20 def initialize(zk_uri, ) # Initialize the Client DruidConfig.client = DruidConfig::Client.new(zk_uri, ) # Used to check the number of retries on error @retries = 0 # Update the base uri to perform queries self.class.base_uri( "#{DruidConfig.client.coordinator}"\ "druid/coordinator/#{DruidConfig::Version::API_VERSION}") end |
Instance Method Details
#close! ⇒ Object
Close connection with zookeeper
36 37 38 |
# File 'lib/druid_config/cluster.rb', line 36 def close! DruidConfig.client.close! end |
#complete_tasks ⇒ Object
Return complete tasks
Returns:
Array of Tasks
331 332 333 334 335 336 337 338 339 340 |
# File 'lib/druid_config/cluster.rb', line 331 def complete_tasks tasks = [] query_overlord do tasks = self.class.get('/completeTasks').map do |task| DruidConfig::Entities::Task.new( task['id'], task['statusCode']) end end end |
#datasource(datasource) ⇒ Object
Return a unique datasource
Parameters:
datasource:
String with the data source name
Returns:
DataSource instance
157 158 159 |
# File 'lib/druid_config/cluster.rb', line 157 def datasource(datasource) datasources.select { |el| el.name == datasource } end |
#datasources ⇒ Object
Return all datasources
Returns:
Array of Datasource initialized.
136 137 138 139 140 141 142 143 144 145 |
# File 'lib/druid_config/cluster.rb', line 136 def datasources datasource_status = load_status secure_query do self.class.get('/datasources?simple').map do |data| DruidConfig::Entities::DataSource.new( data, datasource_status.select { |k, _| k == data['name'] }.values.first) end end end |
#default_datasource ⇒ Object
Return default datasource. This datasource hasn’t got metadata associated. It’s only used to read and apply default rules.
165 166 167 168 169 170 |
# File 'lib/druid_config/cluster.rb', line 165 def default_datasource DruidConfig::Entities::DataSource.new( { 'name' => DruidConfig::Entities::DataSource::DEFAULT_DATASOURCE, 'properties' => {} }, {}) end |
#failed_tasks ⇒ Object
Return failed completed tasks
345 346 347 |
# File 'lib/druid_config/cluster.rb', line 345 def failed_tasks complete_tasks.select(&:failed?) end |
#historicals ⇒ Object
Returns only historial nodes
Returns:
Array of Nodes
253 254 255 |
# File 'lib/druid_config/cluster.rb', line 253 def historicals servers.select { |node| node.type == :historical } end |
#leader ⇒ Object
Return the leader of the Druid cluster
67 68 69 70 71 |
# File 'lib/druid_config/cluster.rb', line 67 def leader secure_query do self.class.get('/leader').body end end |
#load_queue(params = '') ⇒ Object
Load queue of the cluster
85 86 87 88 89 |
# File 'lib/druid_config/cluster.rb', line 85 def load_queue(params = '') secure_query do self.class.get("/loadqueue?#{params}") end end |
#load_status(params = '') ⇒ Object
Load status of the cluster
76 77 78 79 80 |
# File 'lib/druid_config/cluster.rb', line 76 def load_status(params = '') secure_query do self.class.get("/loadstatus?#{params}") end end |
#metadata_datasources(params = '') ⇒ Object Also known as: mt_datasources
Return a Hash with metadata of datasources
97 98 99 100 101 |
# File 'lib/druid_config/cluster.rb', line 97 def (params = '') secure_query do self.class.get("/metadata/datasources?#{params}") end end |
#metadata_datasources_segments(data_source, segment = '') ⇒ Object Also known as: mt_datasources_segments
Return a Hash with metadata of segments
Parameters:
- data_source
-
String with the name of the data source
- segment
-
(Optional) Segment to search
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/druid_config/cluster.rb', line 114 def (data_source, segment = '') end_point = "/metadata/datasources/#{data_source}/segments" secure_query do if segment.empty? || segment == 'full' self.class.get("#{end_point}?#{params}") else self.class.get("#{end_point}/#{params}") end end end |
#physical_servers ⇒ Object Also known as: physical_nodes
URIs of the physical servers in the cluster
Returns:
Array of strings
238 239 240 241 242 |
# File 'lib/druid_config/cluster.rb', line 238 def physical_servers secure_query do @physical_servers ||= servers.map(&:host).uniq end end |
#physical_workers ⇒ Object
URIs of the physical workers in the cluster
290 291 292 |
# File 'lib/druid_config/cluster.rb', line 290 def physical_workers @physical_workers ||= workers.map(&:host).uniq end |
#realtimes ⇒ Object
Returns only realtime
Returns:
Array of Nodes
263 264 265 |
# File 'lib/druid_config/cluster.rb', line 263 def realtimes servers.select { |node| node.type == :realtime } end |
#reset! ⇒ Object
Reset the client
43 44 45 46 47 48 |
# File 'lib/druid_config/cluster.rb', line 43 def reset! DruidConfig.client.reset! self.class.base_uri( "#{DruidConfig.client.coordinator}"\ "druid/coordinator/#{DruidConfig::Version::API_VERSION}") end |
#rules ⇒ Object
Return the rules applied to a cluster
178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/druid_config/cluster.rb', line 178 def rules rules = DruidConfig::Entities::RuleCollection.new secure_query do self.class.get('/rules').each do |datasource, ds_rules| ds_rules.each do |rule| rules << DruidConfig::Entities::Rule.parse(rule, datasource) end end end # Return initialized rules rules end |
#servers ⇒ Object Also known as: nodes
Return all nodes of the cluster
Returns:
Array of node Objects
221 222 223 224 225 226 227 228 229 230 |
# File 'lib/druid_config/cluster.rb', line 221 def servers secure_query do queue = load_queue('simple') self.class.get('/servers?simple').map do |data| DruidConfig::Entities::Node.new( data, queue.select { |k, _| k == data['host'] }.values.first) end end end |
#services ⇒ Object
Availabe services in the cluster
Parameters:
Array of Hash with the format:
{ server: [ services ], server2: [ services ], ... }
366 367 368 369 370 371 372 373 374 375 376 |
# File 'lib/druid_config/cluster.rb', line 366 def services return @services if @services services = {} physical_nodes.each { |node| services[node] = [] } # Load services realtimes.map(&:host).uniq.each { |r| services[r] << :realtime } historicals.map(&:host).uniq.each { |r| services[r] << :historical } physical_workers.each { |w| services[w] << :middleManager } # Return nodes @services = services end |
#success_tasks ⇒ Object
Return success completed tasks
352 353 354 |
# File 'lib/druid_config/cluster.rb', line 352 def success_tasks complete_tasks.select(&:success?) end |
#task(id) ⇒ Object
Find a task
321 322 323 |
# File 'lib/druid_config/cluster.rb', line 321 def task(id) DruidConfig::Entities::Task.find(id) end |
#tiers ⇒ Object
Return all tiers defined in the cluster
Returns:
Array of Tier instances
200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/druid_config/cluster.rb', line 200 def tiers current_nodes = servers # Initialize tiers secure_query do current_nodes.map(&:tier).uniq.map do |tier| DruidConfig::Entities::Tier.new( tier, current_nodes.select { |node| node.tier == tier }) end end end |
#workers ⇒ Object
Return all Workers (MiddleManager) of the cluster
Returns:
Array of Workers
273 274 275 276 277 278 279 280 281 282 283 284 285 |
# File 'lib/druid_config/cluster.rb', line 273 def workers workers = [] # Perform a query query_overlord do secure_query do workers = self.class.get('/workers').map do |worker| DruidConfig::Entities::Worker.new(worker) end end end # Return workers end |