Class: LogStash::Outputs::Kafka

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::Kafka::Common
Defined in:
lib/logstash/outputs/kafka.rb

Overview

Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker.

Here’s a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka output plugin:

options=“header”

|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (‘event[’price’] = 10`) |0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (‘event.set(’[price]‘, 10)`) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================

NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.

This output supports connecting to Kafka over:

  • SSL (requires plugin version 3.0.0 or later)

  • Kerberos SASL (requires plugin version 5.1.0 or later)

By default security is disabled but can be turned on as needed.

The only required configuration is the topic_id. The default codec is plain, so events will be persisted on the broker in plain format. Logstash will encode your messages with not only the message but also with a timestamp and hostname. If you do not want anything but your message passing through, you should make the output configuration something like:

source,ruby

output

kafka {
  codec => plain {
     format => "%{message"
  }
  topic_id => "mytopic"
}

}

For more information see kafka.apache.org/documentation.html#theproducer

Kafka producer configuration: kafka.apache.org/documentation.html#newproducerconfigs

Instance Method Summary collapse

Methods included from PluginMixins::Kafka::Common

included, #reassign_dns_lookup, #set_sasl_config, #set_trustore_keystore_config

Instance Method Details

#closeObject



315
316
317
# File 'lib/logstash/outputs/kafka.rb', line 315

def close
  @producer.close
end

#multi_receive(events) ⇒ Object



224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/logstash/outputs/kafka.rb', line 224

def multi_receive(events)
  t = Thread.current
  if !@thread_batch_map.include?(t)
    @thread_batch_map[t] = java.util.ArrayList.new(events.size)
  end

  events.each do |event|
    @codec.encode(event)
  end

  batch = @thread_batch_map[t]
  if batch.any?
    retrying_send(batch)
    batch.clear
  end
end

#prepare(record) ⇒ Object



219
220
221
222
# File 'lib/logstash/outputs/kafka.rb', line 219

def prepare(record)
  # This output is threadsafe, so we need to keep a batch per thread.
  @thread_batch_map[Thread.current].add(record)
end

#registerObject



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/logstash/outputs/kafka.rb', line 187

def register
  @thread_batch_map = Concurrent::Hash.new

  if !@retries.nil? 
    if @retries < 0
      raise LogStash::ConfigurationError, "A negative retry count (#{@retries}) is not valid. Must be a value >= 0"
    end

    logger.warn("Kafka output is configured with finite retry. This instructs Logstash to LOSE DATA after a set number of send attempts fails. If you do not want to lose data if Kafka is down, then you must remove the retry setting.", :retries => @retries)
  end

  reassign_dns_lookup

  if value_serializer == 'org.apache.kafka.common.serialization.StringSerializer'
    @codec.on_event do |event, data|
      write_to_kafka(event, data)
    end
  elsif value_serializer == 'org.apache.kafka.common.serialization.ByteArraySerializer'
    @codec.on_event do |event, data|
      write_to_kafka(event, data.to_java_bytes)
    end
  else
    raise LogStash::ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer"
  end
  @message_headers.each do |key, value|
    if !key.is_a? String
      raise LogStash::ConfigurationError, "'message_headers' contains a key that is not a string!"
    end
  end
  @producer = create_producer
end

#retrying_send(batch) ⇒ Object



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/logstash/outputs/kafka.rb', line 241

def retrying_send(batch)
  remaining = @retries

  while batch.any?
    unless remaining.nil?
      if remaining < 0
        # TODO(sissel): Offer to DLQ? Then again, if it's a transient fault,
        # DLQing would make things worse (you dlq data that would be successful
        # after the fault is repaired)
        logger.info("Exhausted user-configured retry count when sending to Kafka. Dropping these events.",
                    :max_retries => @retries, :drop_count => batch.count)
        break
      end

      remaining -= 1
    end

    failures = []

    futures = batch.collect do |record| 
      begin
        # send() can throw an exception even before the future is created.
        @producer.send(record)
      rescue org.apache.kafka.common.errors.InterruptException,
             org.apache.kafka.common.errors.RetriableException => e
        logger.info("producer send failed, will retry sending", :exception => e.class, :message => e.message)
        failures << record
        nil
      rescue org.apache.kafka.common.KafkaException => e
        # This error is not retriable, drop event
        # TODO: add DLQ support
        logger.warn("producer send failed, dropping record",:exception => e.class, :message => e.message,
                    :record_value => record.value)
        nil
      end
    end

    futures.each_with_index do |future, i|
      # We cannot skip nils using `futures.compact` because then our index `i` will not align with `batch`
      unless future.nil?
        begin
          future.get
        rescue java.util.concurrent.ExecutionException => e
          # TODO(sissel): Add metric to count failures, possibly by exception type.
          if e.get_cause.is_a? org.apache.kafka.common.errors.RetriableException or
             e.get_cause.is_a? org.apache.kafka.common.errors.InterruptException
            logger.info("producer send failed, will retry sending", :exception => e.cause.class,
                        :message => e.cause.message)
            failures << batch[i]
          elsif e.get_cause.is_a? org.apache.kafka.common.KafkaException
            # This error is not retriable, drop event
            # TODO: add DLQ support
            logger.warn("producer send failed, dropping record", :exception => e.cause.class,
                        :message => e.cause.message, :record_value => batch[i].value)
          end
        end
      end
    end

    # No failures? Cool. Let's move on.
    break if failures.empty?

    # Otherwise, retry with any failed transmissions
    if remaining.nil? || remaining >= 0
      delay = @retry_backoff_ms / 1000.0
      logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size,
                                                                              :failures => failures.size,
                                                                              :sleep => delay)
      batch = failures
      sleep(delay)
    end
  end
end