Class: TestKafka::Broker

Inherits:
Object
  • Object
show all
Defined in:
lib/test_kafka/broker.rb

Constant Summary collapse

DEFAULT_PROPERTIES =
{
  "broker.id" => 0,
  "port" => 9092,
  "num.network.threads" => 2,
  "num.io.threads" => 2,
  "socket.send.buffer.bytes" => 1048576,
  "socket.receive.buffer.bytes" => 1048576,
  "socket.request.max.bytes" => 104857600,
  "log.dir" => "/tmp/kafka-logs",
  "num.partitions" => 1,
  "log.flush.interval.messages" => 10000,
  "log.flush.interval.ms" => 1000,
  "log.retention.hours" => 168,
  "log.segment.bytes" => 536870912,
  "log.cleanup.interval.mins" => 1,
  "zookeeper.connect" => "localhost:2181",
  "zookeeper.connection.timeout.ms" => 1000000,
  "kafka.metrics.polling.interval.secs" => 5,
  "kafka.metrics.reporters" => "kafka.metrics.KafkaCSVMetricsReporter",
  "kafka.csv.metrics.dir" => "/tmp/kafka_metrics",
  "kafka.csv.metrics.reporter.enabled" => "false",
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(kafka_path, tmp_dir, port, zk_port, broker_id = 0, partition_count = 1) ⇒ Broker

Returns a new instance of Broker.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/test_kafka/broker.rb', line 28

def initialize(kafka_path, tmp_dir, port, zk_port, broker_id=0, partition_count=1)
  @broker_id = broker_id
  @port = port
  @jr = JavaRunner.new("broker_#{broker_id}",
                       tmp_dir,
                       "kafka.Kafka",
                       port,
                       kafka_path,
                       DEFAULT_PROPERTIES.merge(
                         "broker.id" => broker_id,
                         "port" => port,
                         "log.dir" => "#{tmp_dir}/kafka-logs_#{broker_id}",
                         "kafka.csv.metrics.dir" => "#{tmp_dir}/kafka_metrics",
                         "num.partitions" => partition_count,
                         "zookeeper.connect" => "localhost:#{zk_port}"
                       ))
end

Instance Attribute Details

#broker_idObject (readonly)

Returns the value of attribute broker_id.



46
47
48
# File 'lib/test_kafka/broker.rb', line 46

def broker_id
  @broker_id
end

#portObject (readonly)

Returns the value of attribute port.



46
47
48
# File 'lib/test_kafka/broker.rb', line 46

def port
  @port
end

Instance Method Details

#pidObject



56
57
58
# File 'lib/test_kafka/broker.rb', line 56

def pid
  @jr.pid
end

#startObject



48
49
50
# File 'lib/test_kafka/broker.rb', line 48

def start
  @jr.start
end

#stopObject



52
53
54
# File 'lib/test_kafka/broker.rb', line 52

def stop
  @jr.stop
end

#with_interruption(&block) ⇒ Object



60
61
62
# File 'lib/test_kafka/broker.rb', line 60

def with_interruption(&block)
  @jr.with_interruption(&block)
end