Class: LogStash::Outputs::Kafka
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::Kafka
- Defined in:
- lib/logstash/outputs/kafka.rb
Overview
Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker.
Here’s a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka output plugin:
- options=“header”
-
|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (‘event[’price’] = 10`) |0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (‘event.set(’[price]‘, 10)`) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================
NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.
This output supports connecting to Kafka over:
-
SSL (requires plugin version 3.0.0 or later)
-
Kerberos SASL (requires plugin version 5.1.0 or later)
By default security is disabled but can be turned on as needed.
The only required configuration is the topic_id. The default codec is plain, so events will be persisted on the broker in plain format. Logstash will encode your messages with not only the message but also with a timestamp and hostname. If you do not want anything but your message passing through, you should make the output configuration something like:
- source,ruby
-
output
kafka { codec => plain { format => "%{message" } topic_id => "mytopic" }
}
For more information see kafka.apache.org/documentation.html#theproducer
Kafka producer configuration: kafka.apache.org/documentation.html#newproducerconfigs
-
Instance Method Summary collapse
- #close ⇒ Object
- #multi_receive(events) ⇒ Object
-
#prepare(record) ⇒ Object
def register.
- #register ⇒ Object
- #retrying_send(batch) ⇒ Object
Instance Method Details
#close ⇒ Object
291 292 293 |
# File 'lib/logstash/outputs/kafka.rb', line 291 def close @producer.close end |
#multi_receive(events) ⇒ Object
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/logstash/outputs/kafka.rb', line 211 def multi_receive(events) t = Thread.current if !@thread_batch_map.include?(t) @thread_batch_map[t] = java.util.ArrayList.new(events.size) end events.each do |event| break if event == LogStash::SHUTDOWN @codec.encode(event) end batch = @thread_batch_map[t] if batch.any? (batch) batch.clear end end |
#prepare(record) ⇒ Object
def register
206 207 208 209 |
# File 'lib/logstash/outputs/kafka.rb', line 206 def prepare(record) # This output is threadsafe, so we need to keep a batch per thread. @thread_batch_map[Thread.current].add(record) end |
#register ⇒ Object
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/logstash/outputs/kafka.rb', line 178 def register @thread_batch_map = Concurrent::Hash.new if !@retries.nil? if @retries < 0 raise ConfigurationError, "A negative retry count (#{@retries}) is not valid. Must be a value >= 0" end @logger.warn("Kafka output is configured with finite retry. This instructs Logstash to LOSE DATA after a set number of send attempts fails. If you do not want to lose data if Kafka is down, then you must remove the retry setting.", :retries => @retries) end @producer = create_producer if value_serializer == 'org.apache.kafka.common.serialization.StringSerializer' @codec.on_event do |event, data| write_to_kafka(event, data) end elsif value_serializer == 'org.apache.kafka.common.serialization.ByteArraySerializer' @codec.on_event do |event, data| write_to_kafka(event, data.to_java_bytes) end else raise ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer" end end |
#retrying_send(batch) ⇒ Object
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'lib/logstash/outputs/kafka.rb', line 229 def (batch) remaining = @retries while batch.any? if !remaining.nil? if remaining < 0 # TODO(sissel): Offer to DLQ? Then again, if it's a transient fault, # DLQing would make things worse (you dlq data that would be successful # after the fault is repaired) logger.info("Exhausted user-configured retry count when sending to Kafka. Dropping these events.", :max_retries => @retries, :drop_count => batch.count) break end remaining -= 1 end failures = [] futures = batch.collect do |record| begin # send() can throw an exception even before the future is created. @producer.send(record) rescue org.apache.kafka.common.errors.TimeoutException => e failures << record nil rescue org.apache.kafka.common.errors.InterruptException => e failures << record nil rescue org.apache.kafka.common.errors.SerializationException => e # TODO(sissel): Retrying will fail because the data itself has a problem serializing. # TODO(sissel): Let's add DLQ here. failures << record nil end end.compact futures.each_with_index do |future, i| begin result = future.get() rescue => e # TODO(sissel): Add metric to count failures, possibly by exception type. logger.warn("KafkaProducer.send() failed: #{e}", :exception => e) failures << batch[i] end end # No failures? Cool. Let's move on. break if failures.empty? # Otherwise, retry with any failed transmissions if remaining.nil? || remaining >= 0 delay = @retry_backoff_ms / 1000.0 logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size, :failures => failures.size, :sleep => delay) batch = failures sleep(delay) end end end |