Class: LogStash::Inputs::Kafka
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Kafka
- Includes:
- PluginMixins::DeprecationLoggerSupport, PluginMixins::Kafka::AvroSchemaRegistry, PluginMixins::Kafka::Common
- Defined in:
- lib/logstash/inputs/kafka.rb
Overview
This input will read events from a Kafka topic. It uses the 0.10 version of the consumer API provided by Kafka to read messages from the broker.
Here’s a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka input plugin:
- options=“header”
-
|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (‘event[’price’] = 10`)
|0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (‘event.set(’[price]‘, 10)`) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================
NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.
This input supports connecting to Kafka over:
-
SSL (requires plugin version 3.0.0 or later)
-
Kerberos SASL (requires plugin version 5.1.0 or later)
By default security is disabled but can be turned on as needed.
The Logstash Kafka consumer handles group management and uses the default offset management strategy using Kafka topics.
Logstash instances by default form a single logical group to subscribe to Kafka topics Each Logstash Kafka consumer can run multiple threads to increase read throughput. Alternatively, you could run multiple Logstash instances with the same ‘group_id` to spread the load across physical machines. Messages in a topic will be distributed to all Logstash instances with the same `group_id`.
Ideally you should have as many threads as the number of partitions for a perfect balance – more threads than partitions means that some threads will be idle
For more information see kafka.apache.org/documentation.html#theconsumer
Kafka consumer configuration: kafka.apache.org/documentation.html#consumerconfigs
-
Constant Summary collapse
- DEFAULT_DESERIALIZER_CLASS =
"org.apache.kafka.common.serialization.StringDeserializer"
- METADATA_NONE =
Set[].freeze
- METADATA_BASIC =
Set[:record_props].freeze
- METADATA_EXTENDED =
Set[:record_props, :headers].freeze
- METADATA_DEPRECATION_MAP =
{ 'true' => 'basic', 'false' => 'none' }
Instance Attribute Summary collapse
-
#metadata_mode ⇒ Object
readonly
Returns the value of attribute metadata_mode.
Instance Method Summary collapse
- #do_poll(consumer) ⇒ Object
- #handle_record(record, codec_instance, queue) ⇒ Object
-
#based(onschemaregistrychangethecodecdefault) ⇒ Kafka
constructor
A new instance of Kafka.
- #kafka_consumers ⇒ Object
- #maybe_commit_offset(consumer) ⇒ Object
- #maybe_set_metadata(event, record) ⇒ Object
- #register ⇒ Object
- #run(logstash_queue) ⇒ Object
- #stop ⇒ Object
- #subscribe(consumer) ⇒ Object
- #thread_runner(logstash_queue, consumer, name) ⇒ Object
Methods included from PluginMixins::Kafka::AvroSchemaRegistry
#check_schema_registry_parameters, included, #schema_registry_validation?, #setup_schema_registry_config, #using_kerberos?
Methods included from PluginMixins::Kafka::Common
included, #reassign_dns_lookup, #set_sasl_config, #set_trustore_keystore_config
Constructor Details
#based(onschemaregistrychangethecodecdefault) ⇒ Kafka
Returns a new instance of Kafka.
262 263 264 265 266 267 268 |
# File 'lib/logstash/inputs/kafka.rb', line 262 def initialize(params = {}) unless params.key?('codec') params['codec'] = params.key?('schema_registry_url') ? 'json' : 'plain' end super(params) end |
Instance Attribute Details
#metadata_mode ⇒ Object (readonly)
Returns the value of attribute metadata_mode.
259 260 261 |
# File 'lib/logstash/inputs/kafka.rb', line 259 def @metadata_mode end |
Instance Method Details
#do_poll(consumer) ⇒ Object
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 |
# File 'lib/logstash/inputs/kafka.rb', line 347 def do_poll(consumer) records = [] begin records = consumer.poll(java.time.Duration.ofMillis(poll_timeout_ms)) rescue org.apache.kafka.common.errors.WakeupException => e logger.debug("Wake up from poll", :kafka_error_message => e) raise e unless stop? rescue org.apache.kafka.common.errors.FencedInstanceIdException => e logger.error("Another consumer with same group.instance.id has connected", :original_error_message => e.) raise e unless stop? rescue => e logger.error("Unable to poll Kafka consumer", :kafka_error_message => e, :cause => e.respond_to?(:getCause) ? e.getCause : nil) Stud.stoppable_sleep(1) { stop? } end records end |
#handle_record(record, codec_instance, queue) ⇒ Object
366 367 368 369 370 371 372 373 |
# File 'lib/logstash/inputs/kafka.rb', line 366 def handle_record(record, codec_instance, queue) # use + since .to_s on nil/boolean returns a frozen string since ruby 2.7 codec_instance.decode(+record.value.to_s) do |event| decorate(event) (event, record) queue << event end end |
#kafka_consumers ⇒ Object
320 321 322 |
# File 'lib/logstash/inputs/kafka.rb', line 320 def kafka_consumers @runner_consumers end |
#maybe_commit_offset(consumer) ⇒ Object
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 |
# File 'lib/logstash/inputs/kafka.rb', line 397 def maybe_commit_offset(consumer) begin consumer.commitSync if @enable_auto_commit.eql?(false) rescue org.apache.kafka.common.errors.WakeupException => e logger.debug("Wake up from commitSync", :kafka_error_message => e) raise e unless stop? rescue StandardError => e # For transient errors, the commit should be successful after the next set of # polled records has been processed. # But, it might also be worth thinking about adding a configurable retry mechanism logger.error("Unable to commit records", :kafka_error_message => e, :cause => e.respond_to?(:getCause) ? e.getCause() : nil) end end |
#maybe_set_metadata(event, record) ⇒ Object
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 |
# File 'lib/logstash/inputs/kafka.rb', line 375 def (event, record) if @metadata_mode.include?(:record_props) event.set("[@metadata][kafka][topic]", record.topic) event.set("[@metadata][kafka][consumer_group]", @group_id) event.set("[@metadata][kafka][partition]", record.partition) event.set("[@metadata][kafka][offset]", record.offset) event.set("[@metadata][kafka][key]", record.key) event.set("[@metadata][kafka][timestamp]", record.) end if @metadata_mode.include?(:headers) record.headers .select{|h| header_with_value(h) } .each do |header| s = String.from_java_bytes(header.value) s.force_encoding(Encoding::UTF_8) if s.valid_encoding? event.set("[@metadata][kafka][headers][" + header.key + "]", s) end end end end |
#register ⇒ Object
271 272 273 274 275 276 277 |
# File 'lib/logstash/inputs/kafka.rb', line 271 def register @runner_threads = [] @metadata_mode = (@decorate_events) reassign_dns_lookup @pattern ||= java.util.regex.Pattern.compile(@topics_pattern) unless @topics_pattern.nil? check_schema_registry_parameters end |
#run(logstash_queue) ⇒ Object
302 303 304 305 306 307 308 309 310 311 |
# File 'lib/logstash/inputs/kafka.rb', line 302 def run(logstash_queue) @runner_consumers = consumer_threads.times.map do |i| thread_group_instance_id = consumer_threads > 1 && group_instance_id ? "#{group_instance_id}-#{i}" : group_instance_id subscribe(create_consumer("#{client_id}-#{i}", thread_group_instance_id)) end @runner_threads = @runner_consumers.map.with_index { |consumer, i| thread_runner(logstash_queue, consumer, "kafka-input-worker-#{client_id}-#{i}") } @runner_threads.each(&:start) @runner_threads.each(&:join) end |
#stop ⇒ Object
314 315 316 317 |
# File 'lib/logstash/inputs/kafka.rb', line 314 def stop # if we have consumers, wake them up to unblock our runner threads @runner_consumers && @runner_consumers.each(&:wakeup) end |
#subscribe(consumer) ⇒ Object
324 325 326 327 |
# File 'lib/logstash/inputs/kafka.rb', line 324 def subscribe(consumer) @pattern.nil? ? consumer.subscribe(topics) : consumer.subscribe(@pattern) consumer end |
#thread_runner(logstash_queue, consumer, name) ⇒ Object
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 |
# File 'lib/logstash/inputs/kafka.rb', line 329 def thread_runner(logstash_queue, consumer, name) java.lang.Thread.new do LogStash::Util::set_thread_name(name) begin codec_instance = @codec.clone until stop? records = do_poll(consumer) unless records.empty? records.each { |record| handle_record(record, codec_instance, logstash_queue) } maybe_commit_offset(consumer) end end ensure consumer.close end end end |