Class: DruidConfig::Cluster

Inherits:
Object
  • Object
show all
Includes:
Util, HTTParty
Defined in:
lib/druid_config/cluster.rb

Overview

Class to initialize the connection to Zookeeper

Instance Method Summary collapse

Methods included from Util

#pop_uri, #query_overlord, #secure_query, #stash_uri

Constructor Details

#initialize(zk_uri, options) ⇒ Cluster

Initialize the client to perform the queries

Parameters:

zk_uri

String with URI or URIs (sparated by comma) of Zookeeper

options

Hash with options:

- discovery_path: String with the discovery path of Druid


20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/druid_config/cluster.rb', line 20

def initialize(zk_uri, options)
  # Initialize the Client
  DruidConfig.client = DruidConfig::Client.new(zk_uri, options)

  # Used to check the number of retries on error
  @retries = 0

  # Update the base uri to perform queries
  self.class.base_uri(
    "#{DruidConfig.client.coordinator}"\
    "druid/coordinator/#{DruidConfig::Version::API_VERSION}")
end

Instance Method Details

#close!Object

Close connection with zookeeper



36
37
38
# File 'lib/druid_config/cluster.rb', line 36

def close!
  DruidConfig.client.close!
end

#complete_tasksObject

Return complete tasks

Returns:

Array of Tasks



331
332
333
334
335
336
337
338
339
340
# File 'lib/druid_config/cluster.rb', line 331

def complete_tasks
  tasks = []
  query_overlord do
    tasks = self.class.get('/completeTasks').map do |task|
      DruidConfig::Entities::Task.new(
        task['id'],
        task['statusCode'])
    end
  end
end

#datasource(datasource) ⇒ Object

Return a unique datasource

Parameters:

datasource:

String with the data source name

Returns:

DataSource instance



157
158
159
# File 'lib/druid_config/cluster.rb', line 157

def datasource(datasource)
  datasources.select { |el| el.name == datasource }
end

#datasourcesObject

Return all datasources

Returns:

Array of Datasource initialized.



136
137
138
139
140
141
142
143
144
145
# File 'lib/druid_config/cluster.rb', line 136

def datasources
  datasource_status = load_status
  secure_query do
    self.class.get('/datasources?simple').map do |data|
      DruidConfig::Entities::DataSource.new(
        data,
        datasource_status.select { |k, _| k == data['name'] }.values.first)
    end
  end
end

#default_datasourceObject

Return default datasource. This datasource hasn’t got metadata associated. It’s only used to read and apply default rules.



165
166
167
168
169
170
# File 'lib/druid_config/cluster.rb', line 165

def default_datasource
  DruidConfig::Entities::DataSource.new(
    { 'name' => DruidConfig::Entities::DataSource::DEFAULT_DATASOURCE,
      'properties' => {} },
    {})
end

#failed_tasksObject

Return failed completed tasks



345
346
347
# File 'lib/druid_config/cluster.rb', line 345

def failed_tasks
  complete_tasks.select(&:failed?)
end

#historicalsObject

Returns only historial nodes

Returns:

Array of Nodes



253
254
255
# File 'lib/druid_config/cluster.rb', line 253

def historicals
  servers.select { |node| node.type == :historical }
end

#leaderObject

Return the leader of the Druid cluster



67
68
69
70
71
# File 'lib/druid_config/cluster.rb', line 67

def leader
  secure_query do
    self.class.get('/leader').body
  end
end

#load_queue(params = '') ⇒ Object

Load queue of the cluster



85
86
87
88
89
# File 'lib/druid_config/cluster.rb', line 85

def load_queue(params = '')
  secure_query do
    self.class.get("/loadqueue?#{params}")
  end
end

#load_status(params = '') ⇒ Object

Load status of the cluster



76
77
78
79
80
# File 'lib/druid_config/cluster.rb', line 76

def load_status(params = '')
  secure_query do
    self.class.get("/loadstatus?#{params}")
  end
end

#metadata_datasources(params = '') ⇒ Object Also known as: mt_datasources

Return a Hash with metadata of datasources



97
98
99
100
101
# File 'lib/druid_config/cluster.rb', line 97

def (params = '')
  secure_query do
    self.class.get("/metadata/datasources?#{params}")
  end
end

#metadata_datasources_segments(data_source, segment = '') ⇒ Object Also known as: mt_datasources_segments

Return a Hash with metadata of segments

Parameters:

data_source

String with the name of the data source

segment

(Optional) Segment to search



114
115
116
117
118
119
120
121
122
123
# File 'lib/druid_config/cluster.rb', line 114

def (data_source, segment = '')
  end_point = "/metadata/datasources/#{data_source}/segments"
  secure_query do
    if segment.empty? || segment == 'full'
      self.class.get("#{end_point}?#{params}")
    else
      self.class.get("#{end_point}/#{params}")
    end
  end
end

#physical_serversObject Also known as: physical_nodes

URIs of the physical servers in the cluster

Returns:

Array of strings



238
239
240
241
242
# File 'lib/druid_config/cluster.rb', line 238

def physical_servers
  secure_query do
    @physical_servers ||= servers.map(&:host).uniq
  end
end

#physical_workersObject

URIs of the physical workers in the cluster



290
291
292
# File 'lib/druid_config/cluster.rb', line 290

def physical_workers
  @physical_workers ||= workers.map(&:host).uniq
end

#realtimesObject

Returns only realtime

Returns:

Array of Nodes



263
264
265
# File 'lib/druid_config/cluster.rb', line 263

def realtimes
  servers.select { |node| node.type == :realtime }
end

#reset!Object

Reset the client



43
44
45
46
47
48
# File 'lib/druid_config/cluster.rb', line 43

def reset!
  DruidConfig.client.reset!
  self.class.base_uri(
    "#{DruidConfig.client.coordinator}"\
    "druid/coordinator/#{DruidConfig::Version::API_VERSION}")
end

#rulesObject

Return the rules applied to a cluster



178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/druid_config/cluster.rb', line 178

def rules
  rules = DruidConfig::Entities::RuleCollection.new
  secure_query do
    self.class.get('/rules').each do |datasource, ds_rules|
      ds_rules.each do |rule|
        rules << DruidConfig::Entities::Rule.parse(rule, datasource)
      end
    end
  end
  # Return initialized rules
  rules
end

#serversObject Also known as: nodes

Return all nodes of the cluster

Returns:

Array of node Objects



221
222
223
224
225
226
227
228
229
230
# File 'lib/druid_config/cluster.rb', line 221

def servers
  secure_query do
    queue = load_queue('simple')
    self.class.get('/servers?simple').map do |data|
      DruidConfig::Entities::Node.new(
        data,
        queue.select { |k, _| k == data['host'] }.values.first)
    end
  end
end

#servicesObject

Availabe services in the cluster

Parameters:

Array of Hash with the format:

{ server: [ services ], server2: [ services ], ... }


366
367
368
369
370
371
372
373
374
375
376
# File 'lib/druid_config/cluster.rb', line 366

def services
  return @services if @services
  services = {}
  physical_nodes.each { |node| services[node] = [] }
  # Load services
  realtimes.map(&:host).uniq.each { |r| services[r] << :realtime }
  historicals.map(&:host).uniq.each { |r| services[r] << :historical }
  physical_workers.each { |w| services[w] << :middleManager }
  # Return nodes
  @services = services
end

#success_tasksObject

Return success completed tasks



352
353
354
# File 'lib/druid_config/cluster.rb', line 352

def success_tasks
  complete_tasks.select(&:success?)
end

#task(id) ⇒ Object

Find a task



321
322
323
# File 'lib/druid_config/cluster.rb', line 321

def task(id)
  DruidConfig::Entities::Task.find(id)
end

#tiersObject

Return all tiers defined in the cluster

Returns:

Array of Tier instances



200
201
202
203
204
205
206
207
208
209
210
# File 'lib/druid_config/cluster.rb', line 200

def tiers
  current_nodes = servers
  # Initialize tiers
  secure_query do
    current_nodes.map(&:tier).uniq.map do |tier|
      DruidConfig::Entities::Tier.new(
        tier,
        current_nodes.select { |node| node.tier == tier })
    end
  end
end

#workersObject

Return all Workers (MiddleManager) of the cluster

Returns:

Array of Workers



273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/druid_config/cluster.rb', line 273

def workers
  workers = []
  # Perform a query
  query_overlord do
    secure_query do
      workers = self.class.get('/workers').map do |worker|
        DruidConfig::Entities::Worker.new(worker)
      end
    end
  end
  # Return
  workers
end