Class: Kazoo::Broker
- Inherits:
-
Object
- Object
- Kazoo::Broker
- Defined in:
- lib/kazoo/broker.rb
Instance Attribute Summary collapse
-
#cluster ⇒ Object
readonly
Returns the value of attribute cluster.
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#jmx_port ⇒ Object
readonly
Returns the value of attribute jmx_port.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
Class Method Summary collapse
Instance Method Summary collapse
- #addr ⇒ Object
- #critical?(replicas: 1) ⇒ Boolean
- #eql?(other) ⇒ Boolean (also: #==)
- #hash ⇒ Object
-
#initialize(cluster, id, host, port, jmx_port: nil) ⇒ Broker
constructor
A new instance of Broker.
- #inspect ⇒ Object
- #led_partitions ⇒ Object
- #replicated_partitions ⇒ Object
Constructor Details
#initialize(cluster, id, host, port, jmx_port: nil) ⇒ Broker
Returns a new instance of Broker.
5 6 7 8 9 |
# File 'lib/kazoo/broker.rb', line 5 def initialize(cluster, id, host, port, jmx_port: nil) @cluster = cluster @id, @host, @port = id, host, port @jmx_port = jmx_port end |
Instance Attribute Details
#cluster ⇒ Object (readonly)
Returns the value of attribute cluster.
3 4 5 |
# File 'lib/kazoo/broker.rb', line 3 def cluster @cluster end |
#host ⇒ Object (readonly)
Returns the value of attribute host.
3 4 5 |
# File 'lib/kazoo/broker.rb', line 3 def host @host end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
3 4 5 |
# File 'lib/kazoo/broker.rb', line 3 def id @id end |
#jmx_port ⇒ Object (readonly)
Returns the value of attribute jmx_port.
3 4 5 |
# File 'lib/kazoo/broker.rb', line 3 def jmx_port @jmx_port end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
3 4 5 |
# File 'lib/kazoo/broker.rb', line 3 def port @port end |
Class Method Details
.from_json(cluster, id, json) ⇒ Object
68 69 70 |
# File 'lib/kazoo/broker.rb', line 68 def self.from_json(cluster, id, json) new(cluster, id.to_i, json.fetch('host'), json.fetch('port'), jmx_port: json.fetch('jmx_port', nil)) end |
Instance Method Details
#addr ⇒ Object
50 51 52 |
# File 'lib/kazoo/broker.rb', line 50 def addr "#{host}:#{port}" end |
#critical?(replicas: 1) ⇒ Boolean
37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/kazoo/broker.rb', line 37 def critical?(replicas: 1) result, threads, mutex = false, ThreadGroup.new, Mutex.new replicated_partitions.each do |partition| t = Thread.new do isr = partition.isr.reject { |r| r == self } mutex.synchronize { result = true if isr.length < replicas } end threads.add(t) end threads.list.each(&:join) result end |
#eql?(other) ⇒ Boolean Also known as: ==
54 55 56 |
# File 'lib/kazoo/broker.rb', line 54 def eql?(other) other.is_a?(Kazoo::Broker) && other.cluster == self.cluster && other.id == self.id end |
#hash ⇒ Object
60 61 62 |
# File 'lib/kazoo/broker.rb', line 60 def hash [self.cluster, self.id].hash end |
#inspect ⇒ Object
64 65 66 |
# File 'lib/kazoo/broker.rb', line 64 def inspect "#<Kazoo::Broker id=#{id} addr=#{addr}>" end |
#led_partitions ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/kazoo/broker.rb', line 11 def led_partitions result, threads, mutex = [], ThreadGroup.new, Mutex.new cluster.partitions.each do |partition| t = Thread.new do select = partition.leader == self mutex.synchronize { result << partition } if select end threads.add(t) end threads.list.each(&:join) result end |
#replicated_partitions ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/kazoo/broker.rb', line 24 def replicated_partitions result, threads, mutex = [], ThreadGroup.new, Mutex.new cluster.partitions.each do |partition| t = Thread.new do select = partition.replicas.include?(self) mutex.synchronize { result << partition } if select end threads.add(t) end threads.list.each(&:join) result end |