Class: LogStash::Outputs::Kafka

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/kafka.rb

Overview

Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker.

Here’s a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka output plugin:

options=“header”

|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (‘event[’price’] = 10`) |0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (‘event.set(’[price]‘, 10)`) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================

NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.

This output supports connecting to Kafka over:

  • SSL (requires plugin version 3.0.0 or later)

  • Kerberos SASL (requires plugin version 5.1.0 or later)

By default security is disabled but can be turned on as needed.

The only required configuration is the topic_id. The default codec is plain, so events will be persisted on the broker in plain format. Logstash will encode your messages with not only the message but also with a timestamp and hostname. If you do not want anything but your message passing through, you should make the output configuration something like:

source,ruby

output

kafka {
  codec => plain {
     format => "%{message"
  }
  topic_id => "mytopic"
}

}

For more information see kafka.apache.org/documentation.html#theproducer

Kafka producer configuration: kafka.apache.org/documentation.html#newproducerconfigs

Instance Method Summary collapse

Instance Method Details

#closeObject



291
292
293
# File 'lib/logstash/outputs/kafka.rb', line 291

def close
  @producer.close
end

#multi_receive(events) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/logstash/outputs/kafka.rb', line 211

def multi_receive(events)
  t = Thread.current
  if !@thread_batch_map.include?(t)
    @thread_batch_map[t] = java.util.ArrayList.new(events.size)
  end

  events.each do |event|
    break if event == LogStash::SHUTDOWN
    @codec.encode(event)
  end

  batch = @thread_batch_map[t]
  if batch.any?
    retrying_send(batch)
    batch.clear
  end
end

#prepare(record) ⇒ Object

def register



206
207
208
209
# File 'lib/logstash/outputs/kafka.rb', line 206

def prepare(record)
  # This output is threadsafe, so we need to keep a batch per thread.
  @thread_batch_map[Thread.current].add(record)
end

#registerObject



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/logstash/outputs/kafka.rb', line 178

def register
  @thread_batch_map = Concurrent::Hash.new

  if !@retries.nil? 
    if @retries < 0
      raise ConfigurationError, "A negative retry count (#{@retries}) is not valid. Must be a value >= 0"
    end

    @logger.warn("Kafka output is configured with finite retry. This instructs Logstash to LOSE DATA after a set number of send attempts fails. If you do not want to lose data if Kafka is down, then you must remove the retry setting.", :retries => @retries)
  end


  @producer = create_producer
  if value_serializer == 'org.apache.kafka.common.serialization.StringSerializer'
    @codec.on_event do |event, data|
      write_to_kafka(event, data)
    end
  elsif value_serializer == 'org.apache.kafka.common.serialization.ByteArraySerializer'
    @codec.on_event do |event, data|
      write_to_kafka(event, data.to_java_bytes)
    end
  else
    raise ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer" 
  end
end

#retrying_send(batch) ⇒ Object



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/logstash/outputs/kafka.rb', line 229

def retrying_send(batch)
  remaining = @retries

  while batch.any?
    if !remaining.nil?
      if remaining < 0
        # TODO(sissel): Offer to DLQ? Then again, if it's a transient fault,
        # DLQing would make things worse (you dlq data that would be successful
        # after the fault is repaired)
        logger.info("Exhausted user-configured retry count when sending to Kafka. Dropping these events.",
                    :max_retries => @retries, :drop_count => batch.count)
        break
      end

      remaining -= 1
    end

    failures = []

    futures = batch.collect do |record| 
      begin
        # send() can throw an exception even before the future is created.
        @producer.send(record)
      rescue org.apache.kafka.common.errors.TimeoutException => e
        failures << record
        nil
      rescue org.apache.kafka.common.errors.InterruptException => e
        failures << record
        nil
      rescue org.apache.kafka.common.errors.SerializationException => e
        # TODO(sissel): Retrying will fail because the data itself has a problem serializing.
        # TODO(sissel): Let's add DLQ here.
        failures << record
        nil
      end
    end.compact

    futures.each_with_index do |future, i|
      begin
        result = future.get()
      rescue => e
        # TODO(sissel): Add metric to count failures, possibly by exception type.
        logger.warn("KafkaProducer.send() failed: #{e}", :exception => e)
        failures << batch[i]
      end
    end

    # No failures? Cool. Let's move on.
    break if failures.empty?

    # Otherwise, retry with any failed transmissions
    if remaining.nil? || remaining >= 0
      delay = @retry_backoff_ms / 1000.0
      logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size,
                                                                              :failures => failures.size,
                                                                              :sleep => delay)
      batch = failures
      sleep(delay)
    end
  end
end