Class: LogStash::Inputs::Kafka

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/kafka.rb

Overview

This input will read events from a Kafka topic. It uses the the newly designed 0.10 version of consumer API provided by Kafka to read messages from the broker.

Here’s a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka input plugin:

options=“header”

|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Security Features |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 | |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |SSL |Works with the old Ruby Event API (‘event[’price’] = 10`)

|0.9 |2.4.x - 5.x.x | 4.x.x |SSL |Works with the new getter/setter APIs (‘event.set(’[price]‘, 10)`) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |SSL |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x |SSL |Not compatible with <= 0.10.0.x broker |==========================================================

NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.

The Logstash Kafka consumer handles group management and uses the default offset management strategy using Kafka topics.

Logstash instances by default form a single logical group to subscribe to Kafka topics Each Logstash Kafka consumer can run multiple threads to increase read throughput. Alternatively, you could run multiple Logstash instances with the same ‘group_id` to spread the load across physical machines. Messages in a topic will be distributed to all Logstash instances with the same `group_id`.

Ideally you should have as many threads as the number of partitions for a perfect balance – more threads than partitions means that some threads will be idle

For more information see kafka.apache.org/documentation.html#theconsumer

Kafka consumer configuration: kafka.apache.org/documentation.html#consumerconfigs

This version also adds support for SSL/TLS security connection to Kafka. By default SSL is disabled but can be turned on as needed.

Instance Method Summary collapse

Instance Method Details

#registerObject



213
214
215
# File 'lib/logstash/inputs/kafka.rb', line 213

def register
  @runner_threads = []
end

#run(logstash_queue) ⇒ Object



218
219
220
221
222
# File 'lib/logstash/inputs/kafka.rb', line 218

def run(logstash_queue)
  @runner_consumers = consumer_threads.times.map { || create_consumer }
  @runner_threads = @runner_consumers.map { |consumer| thread_runner(logstash_queue, consumer) }
  @runner_threads.each { |t| t.join }
end

#stopObject



225
226
227
# File 'lib/logstash/inputs/kafka.rb', line 225

def stop
  @runner_consumers.each { |c| c.wakeup }
end