Class: TestKafka::Broker
- Inherits:
-
Object
- Object
- TestKafka::Broker
- Defined in:
- lib/test_kafka/broker.rb
Constant Summary collapse
- DEFAULT_PROPERTIES =
{ "broker.id" => 0, "port" => 9092, "num.network.threads" => 2, "num.io.threads" => 2, "socket.send.buffer.bytes" => 1048576, "socket.receive.buffer.bytes" => 1048576, "socket.request.max.bytes" => 104857600, "log.dir" => "/tmp/kafka-logs", "num.partitions" => 1, "log.flush.interval.messages" => 10000, "log.flush.interval.ms" => 1000, "log.retention.hours" => 168, "log.segment.bytes" => 536870912, "log.cleanup.interval.mins" => 1, "zookeeper.connect" => "localhost:2181", "zookeeper.connection.timeout.ms" => 1000000, "kafka.metrics.polling.interval.secs" => 5, "kafka.metrics.reporters" => "kafka.metrics.KafkaCSVMetricsReporter", "kafka.csv.metrics.dir" => "/tmp/kafka_metrics", "kafka.csv.metrics.reporter.enabled" => "false", }.freeze
Instance Attribute Summary collapse
-
#broker_id ⇒ Object
readonly
Returns the value of attribute broker_id.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
Instance Method Summary collapse
-
#initialize(kafka_path, tmp_dir, port, zk_port, broker_id = 0, partition_count = 1) ⇒ Broker
constructor
A new instance of Broker.
- #pid ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #with_interruption(&block) ⇒ Object
Constructor Details
#initialize(kafka_path, tmp_dir, port, zk_port, broker_id = 0, partition_count = 1) ⇒ Broker
Returns a new instance of Broker.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/test_kafka/broker.rb', line 28 def initialize(kafka_path, tmp_dir, port, zk_port, broker_id=0, partition_count=1) @broker_id = broker_id @port = port @jr = JavaRunner.new("broker_#{broker_id}", tmp_dir, "kafka.Kafka", port, kafka_path, DEFAULT_PROPERTIES.merge( "broker.id" => broker_id, "port" => port, "log.dir" => "#{tmp_dir}/kafka-logs_#{broker_id}", "kafka.csv.metrics.dir" => "#{tmp_dir}/kafka_metrics", "num.partitions" => partition_count, "zookeeper.connect" => "localhost:#{zk_port}" )) end |
Instance Attribute Details
#broker_id ⇒ Object (readonly)
Returns the value of attribute broker_id.
46 47 48 |
# File 'lib/test_kafka/broker.rb', line 46 def broker_id @broker_id end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
46 47 48 |
# File 'lib/test_kafka/broker.rb', line 46 def port @port end |
Instance Method Details
#pid ⇒ Object
56 57 58 |
# File 'lib/test_kafka/broker.rb', line 56 def pid @jr.pid end |
#start ⇒ Object
48 49 50 |
# File 'lib/test_kafka/broker.rb', line 48 def start @jr.start end |
#stop ⇒ Object
52 53 54 |
# File 'lib/test_kafka/broker.rb', line 52 def stop @jr.stop end |
#with_interruption(&block) ⇒ Object
60 61 62 |
# File 'lib/test_kafka/broker.rb', line 60 def with_interruption(&block) @jr.with_interruption(&block) end |