Class: Kazoo::Broker

Inherits:
Object
  • Object
show all
Defined in:
lib/kazoo/broker.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cluster, id, host, port, jmx_port: nil) ⇒ Broker

Returns a new instance of Broker.



5
6
7
8
9
# File 'lib/kazoo/broker.rb', line 5

def initialize(cluster, id, host, port, jmx_port: nil)
  @cluster = cluster
  @id, @host, @port = id, host, port
  @jmx_port = jmx_port
end

Instance Attribute Details

#clusterObject (readonly)

Returns the value of attribute cluster.



3
4
5
# File 'lib/kazoo/broker.rb', line 3

def cluster
  @cluster
end

#hostObject (readonly)

Returns the value of attribute host.



3
4
5
# File 'lib/kazoo/broker.rb', line 3

def host
  @host
end

#idObject (readonly)

Returns the value of attribute id.



3
4
5
# File 'lib/kazoo/broker.rb', line 3

def id
  @id
end

#jmx_portObject (readonly)

Returns the value of attribute jmx_port.



3
4
5
# File 'lib/kazoo/broker.rb', line 3

def jmx_port
  @jmx_port
end

#portObject (readonly)

Returns the value of attribute port.



3
4
5
# File 'lib/kazoo/broker.rb', line 3

def port
  @port
end

Class Method Details

.from_json(cluster, id, json) ⇒ Object



64
65
66
# File 'lib/kazoo/broker.rb', line 64

def self.from_json(cluster, id, json)
  new(cluster, id.to_i, json.fetch('host'), json.fetch('port'), jmx_port: json.fetch('jmx_port', nil))
end

Instance Method Details

#addrObject



50
51
52
# File 'lib/kazoo/broker.rb', line 50

def addr
  "#{host}:#{port}"
end

#critical?(replicas: 1) ⇒ Boolean

Returns:

  • (Boolean)


37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/kazoo/broker.rb', line 37

def critical?(replicas: 1)
  result, threads, mutex = false, ThreadGroup.new, Mutex.new
  replicated_partitions.each do |partition|
    t = Thread.new do
      isr = partition.isr.reject { |r| r == self }
      mutex.synchronize { result = true if isr.length < replicas }
    end
    threads.add(t)
  end
  threads.list.each(&:join)
  result
end

#eql?(other) ⇒ Boolean Also known as: ==

Returns:

  • (Boolean)


54
55
56
# File 'lib/kazoo/broker.rb', line 54

def eql?(other)
  other.is_a?(Kazoo::Broker) && other.cluster == self.cluster && other.id == self.id
end

#hashObject



60
61
62
# File 'lib/kazoo/broker.rb', line 60

def hash
  [self.cluster, self.id].hash
end

#led_partitionsObject



11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/kazoo/broker.rb', line 11

def led_partitions
  result, threads, mutex = [], ThreadGroup.new, Mutex.new
  cluster.partitions.each do |partition|
    t = Thread.new do
      select = partition.leader == self
      mutex.synchronize { result << partition } if select
    end
    threads.add(t)
  end
  threads.list.each(&:join)
  result
end

#replicated_partitionsObject



24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/kazoo/broker.rb', line 24

def replicated_partitions
  result, threads, mutex = [], ThreadGroup.new, Mutex.new
  cluster.partitions.each do |partition|
    t = Thread.new do
      select = partition.replicas.include?(self)
      mutex.synchronize { result << partition } if select
    end
    threads.add(t)
  end
  threads.list.each(&:join)
  result
end