Class: LogStash::Inputs::Kafka

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::DeprecationLoggerSupport, PluginMixins::Kafka::AvroSchemaRegistry, PluginMixins::Kafka::Common
Defined in:
lib/logstash/inputs/kafka.rb

Overview

This input will read events from a Kafka topic. It uses the 0.10 version of the consumer API provided by Kafka to read messages from the broker.

Here’s a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka input plugin:

options=“header”

|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (‘event[’price’] = 10`)

|0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (‘event.set(’[price]‘, 10)`) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================

NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.

This input supports connecting to Kafka over:

  • SSL (requires plugin version 3.0.0 or later)

  • Kerberos SASL (requires plugin version 5.1.0 or later)

By default security is disabled but can be turned on as needed.

The Logstash Kafka consumer handles group management and uses the default offset management strategy using Kafka topics.

Logstash instances by default form a single logical group to subscribe to Kafka topics Each Logstash Kafka consumer can run multiple threads to increase read throughput. Alternatively, you could run multiple Logstash instances with the same ‘group_id` to spread the load across physical machines. Messages in a topic will be distributed to all Logstash instances with the same `group_id`.

Ideally you should have as many threads as the number of partitions for a perfect balance – more threads than partitions means that some threads will be idle

For more information see kafka.apache.org/documentation.html#theconsumer

Kafka consumer configuration: kafka.apache.org/documentation.html#consumerconfigs

Constant Summary collapse

DEFAULT_DESERIALIZER_CLASS =
"org.apache.kafka.common.serialization.StringDeserializer"
METADATA_NONE =
Set[].freeze
METADATA_BASIC =
Set[:record_props].freeze
METADATA_EXTENDED =
Set[:record_props, :headers].freeze
METADATA_DEPRECATION_MAP =
{ 'true' => 'basic', 'false' => 'none' }

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from PluginMixins::Kafka::AvroSchemaRegistry

#check_schema_registry_parameters, included, #schema_registry_validation?, #setup_schema_registry_config, #using_kerberos?

Methods included from PluginMixins::Kafka::Common

included, #reassign_dns_lookup, #set_sasl_config, #set_trustore_keystore_config

Constructor Details

#based(onschemaregistrychangethecodecdefault) ⇒ Kafka

Returns a new instance of Kafka.



262
263
264
265
266
267
268
# File 'lib/logstash/inputs/kafka.rb', line 262

def initialize(params = {})
  unless params.key?('codec')
    params['codec'] = params.key?('schema_registry_url') ? 'json' : 'plain'
  end

  super(params)
end

Instance Attribute Details

#metadata_modeObject (readonly)

Returns the value of attribute metadata_mode.



259
260
261
# File 'lib/logstash/inputs/kafka.rb', line 259

def 
  @metadata_mode
end

Instance Method Details

#do_poll(consumer) ⇒ Object



347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/logstash/inputs/kafka.rb', line 347

def do_poll(consumer)
  records = []
  begin
    records = consumer.poll(java.time.Duration.ofMillis(poll_timeout_ms))
  rescue org.apache.kafka.common.errors.WakeupException => e
    logger.debug("Wake up from poll", :kafka_error_message => e)
    raise e unless stop?
  rescue org.apache.kafka.common.errors.FencedInstanceIdException => e
    logger.error("Another consumer with same group.instance.id has connected", :original_error_message => e.message)
    raise e unless stop?
  rescue => e
    logger.error("Unable to poll Kafka consumer",
                 :kafka_error_message => e,
                 :cause => e.respond_to?(:getCause) ? e.getCause : nil)
    Stud.stoppable_sleep(1) { stop? }
  end
  records
end

#handle_record(record, codec_instance, queue) ⇒ Object



366
367
368
369
370
371
372
373
# File 'lib/logstash/inputs/kafka.rb', line 366

def handle_record(record, codec_instance, queue)
  # use + since .to_s on nil/boolean returns a frozen string since ruby 2.7
  codec_instance.decode(+record.value.to_s) do |event|
    decorate(event)
    (event, record)
    queue << event
  end
end

#kafka_consumersObject



320
321
322
# File 'lib/logstash/inputs/kafka.rb', line 320

def kafka_consumers
  @runner_consumers
end

#maybe_commit_offset(consumer) ⇒ Object



397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
# File 'lib/logstash/inputs/kafka.rb', line 397

def maybe_commit_offset(consumer)
  begin
    consumer.commitSync if @enable_auto_commit.eql?(false)
  rescue org.apache.kafka.common.errors.WakeupException => e
    logger.debug("Wake up from commitSync", :kafka_error_message => e)
    raise e unless stop?
  rescue StandardError => e
    # For transient errors, the commit should be successful after the next set of
    # polled records has been processed.
    # But, it might also be worth thinking about adding a configurable retry mechanism
    logger.error("Unable to commit records",
                 :kafka_error_message => e,
                 :cause => e.respond_to?(:getCause) ? e.getCause() : nil)
  end
end

#maybe_set_metadata(event, record) ⇒ Object



375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
# File 'lib/logstash/inputs/kafka.rb', line 375

def (event, record)
  if @metadata_mode.include?(:record_props)
    event.set("[@metadata][kafka][topic]", record.topic)
    event.set("[@metadata][kafka][consumer_group]", @group_id)
    event.set("[@metadata][kafka][partition]", record.partition)
    event.set("[@metadata][kafka][offset]", record.offset)
    event.set("[@metadata][kafka][key]", record.key)
    event.set("[@metadata][kafka][timestamp]", record.timestamp)
  end
  if @metadata_mode.include?(:headers)
    record.headers
          .select{|h| header_with_value(h) }
          .each do |header|
      s = String.from_java_bytes(header.value)
      s.force_encoding(Encoding::UTF_8)
      if s.valid_encoding?
        event.set("[@metadata][kafka][headers][" + header.key + "]", s)
      end
    end
  end
end

#registerObject



271
272
273
274
275
276
277
# File 'lib/logstash/inputs/kafka.rb', line 271

def register
  @runner_threads = []
  @metadata_mode = (@decorate_events)
  reassign_dns_lookup
  @pattern ||= java.util.regex.Pattern.compile(@topics_pattern) unless @topics_pattern.nil?
  check_schema_registry_parameters
end

#run(logstash_queue) ⇒ Object



302
303
304
305
306
307
308
309
310
311
# File 'lib/logstash/inputs/kafka.rb', line 302

def run(logstash_queue)
  @runner_consumers = consumer_threads.times.map do |i|
    thread_group_instance_id = consumer_threads > 1 && group_instance_id ? "#{group_instance_id}-#{i}" : group_instance_id
    subscribe(create_consumer("#{client_id}-#{i}", thread_group_instance_id))
  end
  @runner_threads = @runner_consumers.map.with_index { |consumer, i| thread_runner(logstash_queue, consumer,
                                                                                   "kafka-input-worker-#{client_id}-#{i}") }
  @runner_threads.each(&:start)
  @runner_threads.each(&:join)
end

#stopObject



314
315
316
317
# File 'lib/logstash/inputs/kafka.rb', line 314

def stop
  # if we have consumers, wake them up to unblock our runner threads
  @runner_consumers && @runner_consumers.each(&:wakeup)
end

#subscribe(consumer) ⇒ Object



324
325
326
327
# File 'lib/logstash/inputs/kafka.rb', line 324

def subscribe(consumer)
  @pattern.nil? ? consumer.subscribe(topics) : consumer.subscribe(@pattern)
  consumer
end

#thread_runner(logstash_queue, consumer, name) ⇒ Object



329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/logstash/inputs/kafka.rb', line 329

def thread_runner(logstash_queue, consumer, name)
  java.lang.Thread.new do
    LogStash::Util::set_thread_name(name)
    begin
      codec_instance = @codec.clone
      until stop?
        records = do_poll(consumer)
        unless records.empty?
          records.each { |record| handle_record(record, codec_instance, logstash_queue) }
          maybe_commit_offset(consumer)
        end
      end
    ensure
      consumer.close
    end
  end
end