Class: LogStash::Outputs::Kafka
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::Kafka
- Includes:
- PluginMixins::Kafka::Common
- Defined in:
- lib/logstash/outputs/kafka.rb
Overview
Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker.
Here’s a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka output plugin:
- options=“header”
-
|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (‘event[’price’] = 10`) |0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (‘event.set(’[price]‘, 10)`) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================
NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.
This output supports connecting to Kafka over:
-
SSL (requires plugin version 3.0.0 or later)
-
Kerberos SASL (requires plugin version 5.1.0 or later)
By default security is disabled but can be turned on as needed.
The only required configuration is the topic_id. The default codec is plain, so events will be persisted on the broker in plain format. Logstash will encode your messages with not only the message but also with a timestamp and hostname. If you do not want anything but your message passing through, you should make the output configuration something like:
- source,ruby
-
output
kafka { codec => plain { format => "%{message" } topic_id => "mytopic" }
}
For more information see kafka.apache.org/documentation.html#theproducer
Kafka producer configuration: kafka.apache.org/documentation.html#newproducerconfigs
-
Instance Method Summary collapse
- #close ⇒ Object
- #multi_receive(events) ⇒ Object
- #prepare(record) ⇒ Object
- #register ⇒ Object
- #retrying_send(batch) ⇒ Object
Methods included from PluginMixins::Kafka::Common
included, #reassign_dns_lookup, #set_sasl_config, #set_trustore_keystore_config
Instance Method Details
#close ⇒ Object
313 314 315 |
# File 'lib/logstash/outputs/kafka.rb', line 313 def close @producer.close end |
#multi_receive(events) ⇒ Object
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/logstash/outputs/kafka.rb', line 222 def multi_receive(events) t = Thread.current if !@thread_batch_map.include?(t) @thread_batch_map[t] = java.util.ArrayList.new(events.size) end events.each do |event| @codec.encode(event) end batch = @thread_batch_map[t] if batch.any? (batch) batch.clear end end |
#prepare(record) ⇒ Object
217 218 219 220 |
# File 'lib/logstash/outputs/kafka.rb', line 217 def prepare(record) # This output is threadsafe, so we need to keep a batch per thread. @thread_batch_map[Thread.current].add(record) end |
#register ⇒ Object
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/logstash/outputs/kafka.rb', line 185 def register @thread_batch_map = Concurrent::Hash.new if !@retries.nil? if @retries < 0 raise LogStash::ConfigurationError, "A negative retry count (#{@retries}) is not valid. Must be a value >= 0" end logger.warn("Kafka output is configured with finite retry. This instructs Logstash to LOSE DATA after a set number of send attempts fails. If you do not want to lose data if Kafka is down, then you must remove the retry setting.", :retries => @retries) end reassign_dns_lookup if value_serializer == 'org.apache.kafka.common.serialization.StringSerializer' @codec.on_event do |event, data| write_to_kafka(event, data) end elsif value_serializer == 'org.apache.kafka.common.serialization.ByteArraySerializer' @codec.on_event do |event, data| write_to_kafka(event, data.to_java_bytes) end else raise LogStash::ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer" end @message_headers.each do |key, value| if !key.is_a? String raise LogStash::ConfigurationError, "'message_headers' contains a key that is not a string!" end end @producer = create_producer end |
#retrying_send(batch) ⇒ Object
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/logstash/outputs/kafka.rb', line 239 def (batch) remaining = @retries while batch.any? unless remaining.nil? if remaining < 0 # TODO(sissel): Offer to DLQ? Then again, if it's a transient fault, # DLQing would make things worse (you dlq data that would be successful # after the fault is repaired) logger.info("Exhausted user-configured retry count when sending to Kafka. Dropping these events.", :max_retries => @retries, :drop_count => batch.count) break end remaining -= 1 end failures = [] futures = batch.collect do |record| begin # send() can throw an exception even before the future is created. @producer.send(record) rescue org.apache.kafka.common.errors.InterruptException, org.apache.kafka.common.errors.RetriableException => e logger.info("producer send failed, will retry sending", :exception => e.class, :message => e.) failures << record nil rescue org.apache.kafka.common.KafkaException => e # This error is not retriable, drop event # TODO: add DLQ support logger.warn("producer send failed, dropping record",:exception => e.class, :message => e., :record_value => record.value) nil end end futures.each_with_index do |future, i| # We cannot skip nils using `futures.compact` because then our index `i` will not align with `batch` unless future.nil? begin future.get rescue java.util.concurrent.ExecutionException => e # TODO(sissel): Add metric to count failures, possibly by exception type. if e.get_cause.is_a? org.apache.kafka.common.errors.RetriableException or e.get_cause.is_a? org.apache.kafka.common.errors.InterruptException logger.info("producer send failed, will retry sending", :exception => e.cause.class, :message => e.cause.) failures << batch[i] elsif e.get_cause.is_a? org.apache.kafka.common.KafkaException # This error is not retriable, drop event # TODO: add DLQ support logger.warn("producer send failed, dropping record", :exception => e.cause.class, :message => e.cause., :record_value => batch[i].value) end end end end # No failures? Cool. Let's move on. break if failures.empty? # Otherwise, retry with any failed transmissions if remaining.nil? || remaining >= 0 delay = @retry_backoff_ms / 1000.0 logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size, :failures => failures.size, :sleep => delay) batch = failures sleep(delay) end end end |