Class: Druid::ZooHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/druid/zoo_handler.rb

Instance Method Summary collapse

Constructor Details

#initialize(uri, opts = {}) ⇒ ZooHandler

Returns a new instance of ZooHandler.



8
9
10
11
12
13
14
15
# File 'lib/druid/zoo_handler.rb', line 8

def initialize(uri, opts = {})
  @zk = ZK.new uri, :chroot => :check
  @registry = Hash.new {|hash,key| hash[key] = Array.new }
  @discovery_path = opts[:discovery_path] || '/discoveryPath'
  @watched_services = Hash.new

  init_zookeeper
end

Instance Method Details

#check_service(service) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/druid/zoo_handler.rb', line 49

def check_service(service)
  unless @watched_services.include? service
    watchPath = "#{@discovery_path}/#{service}"
    @watched_services[service] = @zk.register(watchPath, :only => :child) do |event|
      old_handler = @watched_services.delete(service)
      if old_handler
        old_handler.unregister
      end
      check_service service
    end

    known = @registry[service].map{ |node| node[:name] } rescue []
    live = @zk.children(watchPath, :watch => true)

    # copy the unchanged entries
    new_list = @registry[service].select{ |node| live.include? node[:name] } rescue []

    # verify the new entries to be living brokers
    (live - known).each do |name|
      info = @zk.get "#{watchPath}/#{name}"
      node = JSON.parse(info[0])
      uri =  "http://#{node['address']}:#{node['port']}/druid/v2/"

      begin
        check_uri = "#{uri}datasources/"

        check = RestClient::Request.execute({
          :method => :get,
          :url => check_uri,
          :timeout => 5,
          :open_timeout => 5
        })

        if check.code == 200
          new_list.push({
            :name => name,
            :uri => uri,
            :data_sources => JSON.parse(check.to_str)
          })
        else
        end
      rescue
      end
    end

    if !new_list.empty?
      # poor mans load balancing
      @registry[service] = new_list.shuffle
    else
      # don't show services w/o active brokers
      @registry.delete service
    end
  end
end

#check_servicesObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/druid/zoo_handler.rb', line 33

def check_services
  zk_services = @zk.children @discovery_path, :watch => true

  #remove deprecated services
  (services - zk_services).each do |old_service|
    @registry.delete old_service
    if @watched_services.include? old_service
      @watched_services.delete(old_service).unregister
    end
  end

  zk_services.each do |service|
    check_service service unless @watched_services.include? service
  end
end

#close!Object



29
30
31
# File 'lib/druid/zoo_handler.rb', line 29

def close!
  @zk.close!
end

#data_sourcesObject



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/druid/zoo_handler.rb', line 108

def data_sources
  result = Hash.new { |hash, key| hash[key] = [] }

  @registry.each do |service, brokers|
    brokers.each do |broker|
      broker[:data_sources].each do |data_source|
        result["#{service}/#{data_source}"] << broker[:uri]
      end
    end
  end
  result.each do |source, uris|
    result[source] = uris.sample if uris.respond_to?(:sample)
  end

  result
end

#init_zookeeperObject



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/druid/zoo_handler.rb', line 17

def init_zookeeper
  @zk.on_expired_session do
    init_zookeeper
  end

  @zk.register(@discovery_path, :only => :child) do |event|
    check_services
  end

  check_services
end

#servicesObject



104
105
106
# File 'lib/druid/zoo_handler.rb', line 104

def services
  @registry.keys
end

#to_sObject



125
126
127
# File 'lib/druid/zoo_handler.rb', line 125

def to_s
  @registry.to_s
end