Class: LogStash::Inputs::Kafka
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Kafka
- Includes:
- PluginMixins::DeprecationLoggerSupport, PluginMixins::Kafka::AvroSchemaRegistry, PluginMixins::Kafka::Common
- Defined in:
- lib/logstash/inputs/kafka.rb
Overview
This input will read events from a Kafka topic. It uses the 0.10 version of the consumer API provided by Kafka to read messages from the broker.
Here’s a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka input plugin:
- options=“header”
-
|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (‘event[’price’] = 10`)
|0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (‘event.set(’[price]‘, 10)`) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================
NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.
This input supports connecting to Kafka over:
-
SSL (requires plugin version 3.0.0 or later)
-
Kerberos SASL (requires plugin version 5.1.0 or later)
By default security is disabled but can be turned on as needed.
The Logstash Kafka consumer handles group management and uses the default offset management strategy using Kafka topics.
Logstash instances by default form a single logical group to subscribe to Kafka topics Each Logstash Kafka consumer can run multiple threads to increase read throughput. Alternatively, you could run multiple Logstash instances with the same ‘group_id` to spread the load across physical machines. Messages in a topic will be distributed to all Logstash instances with the same `group_id`.
Ideally you should have as many threads as the number of partitions for a perfect balance – more threads than partitions means that some threads will be idle
For more information see kafka.apache.org/documentation.html#theconsumer
Kafka consumer configuration: kafka.apache.org/documentation.html#consumerconfigs
-
Constant Summary collapse
- DEFAULT_DESERIALIZER_CLASS =
"org.apache.kafka.common.serialization.StringDeserializer"
- METADATA_NONE =
Set[].freeze
- METADATA_BASIC =
Set[:record_props].freeze
- METADATA_EXTENDED =
Set[:record_props, :headers].freeze
- METADATA_DEPRECATION_MAP =
{ 'true' => 'basic', 'false' => 'none' }
Instance Attribute Summary collapse
-
#metadata_mode ⇒ Object
readonly
Returns the value of attribute metadata_mode.
Instance Method Summary collapse
- #do_poll(consumer) ⇒ Object
- #handle_record(record, codec_instance, queue) ⇒ Object
-
#based(onschemaregistrychangethecodecdefault) ⇒ Kafka
constructor
A new instance of Kafka.
- #kafka_consumers ⇒ Object
- #maybe_commit_offset(consumer) ⇒ Object
- #maybe_set_metadata(event, record) ⇒ Object
- #register ⇒ Object
- #run(logstash_queue) ⇒ Object
- #stop ⇒ Object
- #subscribe(consumer) ⇒ Object
- #thread_runner(logstash_queue, consumer, name) ⇒ Object
Methods included from PluginMixins::Kafka::AvroSchemaRegistry
#check_schema_registry_parameters, included, #schema_registry_validation?, #setup_schema_registry_config, #using_kerberos?
Methods included from PluginMixins::Kafka::Common
included, #reassign_dns_lookup, #set_sasl_config, #set_trustore_keystore_config
Constructor Details
#based(onschemaregistrychangethecodecdefault) ⇒ Kafka
Returns a new instance of Kafka.
254 255 256 257 258 259 260 |
# File 'lib/logstash/inputs/kafka.rb', line 254 def initialize(params = {}) unless params.key?('codec') params['codec'] = params.key?('schema_registry_url') ? 'json' : 'plain' end super(params) end |
Instance Attribute Details
#metadata_mode ⇒ Object (readonly)
Returns the value of attribute metadata_mode.
251 252 253 |
# File 'lib/logstash/inputs/kafka.rb', line 251 def @metadata_mode end |
Instance Method Details
#do_poll(consumer) ⇒ Object
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 |
# File 'lib/logstash/inputs/kafka.rb', line 339 def do_poll(consumer) records = [] begin records = consumer.poll(java.time.Duration.ofMillis(poll_timeout_ms)) rescue org.apache.kafka.common.errors.WakeupException => e logger.debug("Wake up from poll", :kafka_error_message => e) raise e unless stop? rescue org.apache.kafka.common.errors.FencedInstanceIdException => e logger.error("Another consumer with same group.instance.id has connected", :original_error_message => e.) raise e unless stop? rescue => e logger.error("Unable to poll Kafka consumer", :kafka_error_message => e, :cause => e.respond_to?(:getCause) ? e.getCause : nil) Stud.stoppable_sleep(1) { stop? } end records end |
#handle_record(record, codec_instance, queue) ⇒ Object
358 359 360 361 362 363 364 365 |
# File 'lib/logstash/inputs/kafka.rb', line 358 def handle_record(record, codec_instance, queue) # use + since .to_s on nil/boolean returns a frozen string since ruby 2.7 codec_instance.decode(+record.value.to_s) do |event| decorate(event) (event, record) queue << event end end |
#kafka_consumers ⇒ Object
312 313 314 |
# File 'lib/logstash/inputs/kafka.rb', line 312 def kafka_consumers @runner_consumers end |
#maybe_commit_offset(consumer) ⇒ Object
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 |
# File 'lib/logstash/inputs/kafka.rb', line 389 def maybe_commit_offset(consumer) begin consumer.commitSync if @enable_auto_commit.eql?(false) rescue org.apache.kafka.common.errors.WakeupException => e logger.debug("Wake up from commitSync", :kafka_error_message => e) raise e unless stop? rescue StandardError => e # For transient errors, the commit should be successful after the next set of # polled records has been processed. # But, it might also be worth thinking about adding a configurable retry mechanism logger.error("Unable to commit records", :kafka_error_message => e, :cause => e.respond_to?(:getCause) ? e.getCause() : nil) end end |
#maybe_set_metadata(event, record) ⇒ Object
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 |
# File 'lib/logstash/inputs/kafka.rb', line 367 def (event, record) if @metadata_mode.include?(:record_props) event.set("[@metadata][kafka][topic]", record.topic) event.set("[@metadata][kafka][consumer_group]", @group_id) event.set("[@metadata][kafka][partition]", record.partition) event.set("[@metadata][kafka][offset]", record.offset) event.set("[@metadata][kafka][key]", record.key) event.set("[@metadata][kafka][timestamp]", record.) end if @metadata_mode.include?(:headers) record.headers .select{|h| header_with_value(h) } .each do |header| s = String.from_java_bytes(header.value) s.force_encoding(Encoding::UTF_8) if s.valid_encoding? event.set("[@metadata][kafka][headers][" + header.key + "]", s) end end end end |
#register ⇒ Object
263 264 265 266 267 268 269 |
# File 'lib/logstash/inputs/kafka.rb', line 263 def register @runner_threads = [] @metadata_mode = (@decorate_events) reassign_dns_lookup @pattern ||= java.util.regex.Pattern.compile(@topics_pattern) unless @topics_pattern.nil? check_schema_registry_parameters end |
#run(logstash_queue) ⇒ Object
294 295 296 297 298 299 300 301 302 303 |
# File 'lib/logstash/inputs/kafka.rb', line 294 def run(logstash_queue) @runner_consumers = consumer_threads.times.map do |i| thread_group_instance_id = consumer_threads > 1 && group_instance_id ? "#{group_instance_id}-#{i}" : group_instance_id subscribe(create_consumer("#{client_id}-#{i}", thread_group_instance_id)) end @runner_threads = @runner_consumers.map.with_index { |consumer, i| thread_runner(logstash_queue, consumer, "kafka-input-worker-#{client_id}-#{i}") } @runner_threads.each(&:start) @runner_threads.each(&:join) end |
#stop ⇒ Object
306 307 308 309 |
# File 'lib/logstash/inputs/kafka.rb', line 306 def stop # if we have consumers, wake them up to unblock our runner threads @runner_consumers && @runner_consumers.each(&:wakeup) end |
#subscribe(consumer) ⇒ Object
316 317 318 319 |
# File 'lib/logstash/inputs/kafka.rb', line 316 def subscribe(consumer) @pattern.nil? ? consumer.subscribe(topics) : consumer.subscribe(@pattern) consumer end |
#thread_runner(logstash_queue, consumer, name) ⇒ Object
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/logstash/inputs/kafka.rb', line 321 def thread_runner(logstash_queue, consumer, name) java.lang.Thread.new do LogStash::Util::set_thread_name(name) begin codec_instance = @codec.clone until stop? records = do_poll(consumer) unless records.empty? records.each { |record| handle_record(record, codec_instance, logstash_queue) } maybe_commit_offset(consumer) end end ensure consumer.close end end end |