Class: LogStash::Inputs::Kafka
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Kafka
- Includes:
- PluginMixins::DeprecationLoggerSupport, PluginMixins::Kafka::AvroSchemaRegistry, PluginMixins::Kafka::Common
- Defined in:
- lib/logstash/inputs/kafka.rb
Overview
This input will read events from a Kafka topic. It uses the 0.10 version of the consumer API provided by Kafka to read messages from the broker.
Here’s a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka input plugin:
- options=“header”
-
|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (‘event[’price’] = 10`)
|0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (‘event.set(’[price]‘, 10)`) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================
NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.
This input supports connecting to Kafka over:
-
SSL (requires plugin version 3.0.0 or later)
-
Kerberos SASL (requires plugin version 5.1.0 or later)
By default security is disabled but can be turned on as needed.
The Logstash Kafka consumer handles group management and uses the default offset management strategy using Kafka topics.
Logstash instances by default form a single logical group to subscribe to Kafka topics Each Logstash Kafka consumer can run multiple threads to increase read throughput. Alternatively, you could run multiple Logstash instances with the same ‘group_id` to spread the load across physical machines. Messages in a topic will be distributed to all Logstash instances with the same `group_id`.
Ideally you should have as many threads as the number of partitions for a perfect balance – more threads than partitions means that some threads will be idle
For more information see kafka.apache.org/documentation.html#theconsumer
Kafka consumer configuration: kafka.apache.org/documentation.html#consumerconfigs
-
Constant Summary collapse
- DEFAULT_DESERIALIZER_CLASS =
"org.apache.kafka.common.serialization.StringDeserializer"
- METADATA_NONE =
Set[].freeze
- METADATA_BASIC =
Set[:record_props].freeze
- METADATA_EXTENDED =
Set[:record_props, :headers].freeze
- METADATA_DEPRECATION_MAP =
{ 'true' => 'basic', 'false' => 'none' }
Instance Attribute Summary collapse
-
#metadata_mode ⇒ Object
readonly
Returns the value of attribute metadata_mode.
Instance Method Summary collapse
- #do_poll(consumer) ⇒ Object
- #handle_record(record, codec_instance, queue) ⇒ Object
-
#based(onschemaregistrychangethecodecdefault) ⇒ Kafka
constructor
A new instance of Kafka.
- #kafka_consumers ⇒ Object
- #maybe_commit_offset(consumer) ⇒ Object
- #maybe_set_metadata(event, record) ⇒ Object
- #register ⇒ Object
- #run(logstash_queue) ⇒ Object
- #stop ⇒ Object
- #subscribe(consumer) ⇒ Object
- #thread_runner(logstash_queue, consumer, name) ⇒ Object
Methods included from PluginMixins::Kafka::AvroSchemaRegistry
#check_schema_registry_parameters, included, #schema_registry_validation?, #setup_schema_registry_config, #using_kerberos?
Methods included from PluginMixins::Kafka::Common
included, #reassign_dns_lookup, #set_sasl_config, #set_trustore_keystore_config
Constructor Details
#based(onschemaregistrychangethecodecdefault) ⇒ Kafka
Returns a new instance of Kafka.
260 261 262 263 264 265 266 |
# File 'lib/logstash/inputs/kafka.rb', line 260 def initialize(params = {}) unless params.key?('codec') params['codec'] = params.key?('schema_registry_url') ? 'json' : 'plain' end super(params) end |
Instance Attribute Details
#metadata_mode ⇒ Object (readonly)
Returns the value of attribute metadata_mode.
257 258 259 |
# File 'lib/logstash/inputs/kafka.rb', line 257 def @metadata_mode end |
Instance Method Details
#do_poll(consumer) ⇒ Object
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 |
# File 'lib/logstash/inputs/kafka.rb', line 345 def do_poll(consumer) records = [] begin records = consumer.poll(java.time.Duration.ofMillis(poll_timeout_ms)) rescue org.apache.kafka.common.errors.WakeupException => e logger.debug("Wake up from poll", :kafka_error_message => e) raise e unless stop? rescue org.apache.kafka.common.errors.FencedInstanceIdException => e logger.error("Another consumer with same group.instance.id has connected", :original_error_message => e.) raise e unless stop? rescue => e logger.error("Unable to poll Kafka consumer", :kafka_error_message => e, :cause => e.respond_to?(:getCause) ? e.getCause : nil) Stud.stoppable_sleep(1) { stop? } end records end |
#handle_record(record, codec_instance, queue) ⇒ Object
364 365 366 367 368 369 370 371 |
# File 'lib/logstash/inputs/kafka.rb', line 364 def handle_record(record, codec_instance, queue) # use + since .to_s on nil/boolean returns a frozen string since ruby 2.7 codec_instance.decode(+record.value.to_s) do |event| decorate(event) (event, record) queue << event end end |
#kafka_consumers ⇒ Object
318 319 320 |
# File 'lib/logstash/inputs/kafka.rb', line 318 def kafka_consumers @runner_consumers end |
#maybe_commit_offset(consumer) ⇒ Object
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 |
# File 'lib/logstash/inputs/kafka.rb', line 395 def maybe_commit_offset(consumer) begin consumer.commitSync if @enable_auto_commit.eql?(false) rescue org.apache.kafka.common.errors.WakeupException => e logger.debug("Wake up from commitSync", :kafka_error_message => e) raise e unless stop? rescue StandardError => e # For transient errors, the commit should be successful after the next set of # polled records has been processed. # But, it might also be worth thinking about adding a configurable retry mechanism logger.error("Unable to commit records", :kafka_error_message => e, :cause => e.respond_to?(:getCause) ? e.getCause() : nil) end end |
#maybe_set_metadata(event, record) ⇒ Object
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 |
# File 'lib/logstash/inputs/kafka.rb', line 373 def (event, record) if @metadata_mode.include?(:record_props) event.set("[@metadata][kafka][topic]", record.topic) event.set("[@metadata][kafka][consumer_group]", @group_id) event.set("[@metadata][kafka][partition]", record.partition) event.set("[@metadata][kafka][offset]", record.offset) event.set("[@metadata][kafka][key]", record.key) event.set("[@metadata][kafka][timestamp]", record.) end if @metadata_mode.include?(:headers) record.headers .select{|h| header_with_value(h) } .each do |header| s = String.from_java_bytes(header.value) s.force_encoding(Encoding::UTF_8) if s.valid_encoding? event.set("[@metadata][kafka][headers][" + header.key + "]", s) end end end end |
#register ⇒ Object
269 270 271 272 273 274 275 |
# File 'lib/logstash/inputs/kafka.rb', line 269 def register @runner_threads = [] @metadata_mode = (@decorate_events) reassign_dns_lookup @pattern ||= java.util.regex.Pattern.compile(@topics_pattern) unless @topics_pattern.nil? check_schema_registry_parameters end |
#run(logstash_queue) ⇒ Object
300 301 302 303 304 305 306 307 308 309 |
# File 'lib/logstash/inputs/kafka.rb', line 300 def run(logstash_queue) @runner_consumers = consumer_threads.times.map do |i| thread_group_instance_id = consumer_threads > 1 && group_instance_id ? "#{group_instance_id}-#{i}" : group_instance_id subscribe(create_consumer("#{client_id}-#{i}", thread_group_instance_id)) end @runner_threads = @runner_consumers.map.with_index { |consumer, i| thread_runner(logstash_queue, consumer, "kafka-input-worker-#{client_id}-#{i}") } @runner_threads.each(&:start) @runner_threads.each(&:join) end |
#stop ⇒ Object
312 313 314 315 |
# File 'lib/logstash/inputs/kafka.rb', line 312 def stop # if we have consumers, wake them up to unblock our runner threads @runner_consumers && @runner_consumers.each(&:wakeup) end |
#subscribe(consumer) ⇒ Object
322 323 324 325 |
# File 'lib/logstash/inputs/kafka.rb', line 322 def subscribe(consumer) @pattern.nil? ? consumer.subscribe(topics) : consumer.subscribe(@pattern) consumer end |
#thread_runner(logstash_queue, consumer, name) ⇒ Object
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 |
# File 'lib/logstash/inputs/kafka.rb', line 327 def thread_runner(logstash_queue, consumer, name) java.lang.Thread.new do LogStash::Util::set_thread_name(name) begin codec_instance = @codec.clone until stop? records = do_poll(consumer) unless records.empty? records.each { |record| handle_record(record, codec_instance, logstash_queue) } maybe_commit_offset(consumer) end end ensure consumer.close end end end |