Class: Hermann::Discovery::Zookeeper

Inherits:
Object
  • Object
show all
Defined in:
lib/hermann/discovery/zookeeper.rb

Overview

Communicates with Zookeeper to discover kafka broker ids

Defined Under Namespace

Classes: CuratorImpl, ZkGemImpl

Constant Summary collapse

BROKERS_PATH =
"/brokers/ids".freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(zookeepers) ⇒ Zookeeper

Returns a new instance of Zookeeper.



18
19
20
21
22
23
24
25
26
27
28
# File 'lib/hermann/discovery/zookeeper.rb', line 18

def initialize(zookeepers)
  @zookeepers = zookeepers
  @impl =  nil
  if CuratorImpl.usable?
    @impl = CuratorImpl.new(zookeepers)
  elsif ZkGemImpl.usable?
    @impl = ZkGemImpl.new(zookeepers)
  else
    raise Hermann::Errors::GeneralError, "Could not find a usable Zookeeper implementation, please make sure either the `zk` gem is installed or Curator is on the classpath"
  end
end

Instance Attribute Details

#implObject (readonly)

Returns the value of attribute impl.



15
16
17
# File 'lib/hermann/discovery/zookeeper.rb', line 15

def impl
  @impl
end

#zookeepersObject (readonly)

Returns the value of attribute zookeepers.



15
16
17
# File 'lib/hermann/discovery/zookeeper.rb', line 15

def zookeepers
  @zookeepers
end

Instance Method Details

#get_brokers(timeout = 0) ⇒ Array

Gets comma separated string of brokers

Parameters:

  • timeout (Fixnum) (defaults to: 0)

    to connect to zookeeper, “2 times the tickTime (as set in the server configuration) and a maximum of 20 times the tickTime2 times the tick time set on server”

Returns:

  • (Array)

    List of brokers from ZK



38
39
40
41
42
43
44
45
# File 'lib/hermann/discovery/zookeeper.rb', line 38

def get_brokers(timeout=0)
  brokers = impl.brokers(timeout).map { |b| format_broker_from_znode(b) }

  if brokers.empty?
    raise Hermann::Errors::NoBrokersError
  end
  return brokers
end