Class: Kafkat::Interface::Zookeeper

Inherits:
Object
  • Object
show all
Defined in:
lib/kafkat/interface/zookeeper.rb

Defined Under Namespace

Classes: NotFoundError, WriteConflictError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Zookeeper

Returns a new instance of Zookeeper.



12
13
14
# File 'lib/kafkat/interface/zookeeper.rb', line 12

def initialize(config)
  @zk_path = config.zk_path
end

Instance Attribute Details

#zk_pathObject (readonly)

Returns the value of attribute zk_path.



10
11
12
# File 'lib/kafkat/interface/zookeeper.rb', line 10

def zk_path
  @zk_path
end

Instance Method Details

#get_broker(id) ⇒ Object



38
39
40
41
42
43
44
45
46
# File 'lib/kafkat/interface/zookeeper.rb', line 38

def get_broker(id)
  path = broker_path(id)
  string = zk.get(path).first
  json = JSON.parse(string)
  host, port = json['host'], json['port']
  Broker.new(id, host, port)
rescue ZK::Exceptions::NoNode
  raise NotFoundError
end

#get_broker_idsObject



16
17
18
# File 'lib/kafkat/interface/zookeeper.rb', line 16

def get_broker_ids
  zk.children(brokers_path)
end

#get_brokers(ids = nil) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/kafkat/interface/zookeeper.rb', line 20

def get_brokers(ids=nil)
  brokers = {}
  ids ||= zk.children(brokers_path)

  threads = ids.map do |id|
    id = id.to_i
    Thread.new do
      begin
        brokers[id] = get_broker(id)
      rescue
      end
    end
  end
  threads.map(&:join)

  brokers
end

#get_controllerObject



117
118
119
120
121
122
123
124
# File 'lib/kafkat/interface/zookeeper.rb', line 117

def get_controller
  string = zk.get(controller_path).first
  controller_json = JSON.parse(string)
  controller_id = controller_json['brokerid']
  get_broker(controller_id)
rescue ZK::Exceptions::NoNode
  raise NotFoundError
end

#get_topic(name) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/kafkat/interface/zookeeper.rb', line 80

def get_topic(name)
  partition_queue = Queue.new
  path1 = topic_path(name)
  path2 = topic_partitions_path(name)

  partitions = []
  topic_string = pool.with_connection { |cnx| cnx.get(path1).first }
  partition_ids = pool.with_connection { |cnx| cnx.children(path2) }

  topic_json = JSON.parse(topic_string)

  threads = partition_ids.map do |id|
    id = id.to_i

    Thread.new do
      path3 = topic_partition_state_path(name, id)
      partition_string = pool.with_connection { |cnx| cnx.get(path3).first }
      partition_json = JSON.parse(partition_string)
      replicas = topic_json['partitions'][id.to_s]
      leader = partition_json['leader']
      isr = partition_json['isr']

      partition_queue << Partition.new(name, id, replicas, leader, isr)
    end
  end
  threads.map(&:join)

  until partition_queue.empty? do
    partitions << partition_queue.pop
  end

  partitions.sort_by!(&:id)
  Topic.new(name, partitions)
rescue ZK::Exceptions::NoNode
  raise NotFoundError
end

#get_topic_namesObject



48
49
50
# File 'lib/kafkat/interface/zookeeper.rb', line 48

def get_topic_names()
  return zk.children(topics_path)
end

#get_topics(names = nil) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/kafkat/interface/zookeeper.rb', line 52

def get_topics(names=nil)
  error_msgs = {}
  topics = {}

  if names == nil
    pool.with_connection do |cnx|
      names = cnx.children(topics_path)
    end
  end

  threads = names.map do |name|
    Thread.new do
      begin
        topics[name] = get_topic(name)
      rescue => e
        error_msgs[name] = e
      end
    end
  end
  threads.map(&:join)

  unless error_msgs.empty?
    STDERR.print "ERROR: zk cmds failed on get_topics: \n#{error_msgs.values.join("\n")}\n"
    exit 1
  end
  topics
end

#write_leader(partition, broker_id) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/kafkat/interface/zookeeper.rb', line 126

def write_leader(partition, broker_id)
  path = topic_partition_state_path(partition.topic_name, partition.id)
  string, stat = zk.get(path)

  partition_json = JSON.parse(string)
  partition_json['leader'] = broker_id
  new_string = JSON.dump(partition_json)

  unless zk.set(path, new_string, version: stat.version)
    raise ChangedDuringUpdateError
  end
end