Class: LogStashLogger::Device::Kafka

Inherits:
Connectable show all
Defined in:
lib/logstash-logger/device/kafka.rb

Constant Summary collapse

DEFAULT_HOST =
'localhost'
DEFAULT_PORT =
9092
DEFAULT_TOPIC =
'logstash'
DEFAULT_PRODUCER =
'logstash-logger'
DEFAULT_BACKOFF =
1

Instance Attribute Summary collapse

Attributes inherited from Connectable

#buffer_logger

Attributes inherited from Base

#error_logger, #io, #sync

Instance Method Summary collapse

Methods inherited from Connectable

#close, #connected?, #flush, #on_full_buffer_receive, #reconnect, #to_io, #write

Methods included from Buffer

#buffer_flush, #buffer_full?, #buffer_initialize, #buffer_receive, #reset_buffer

Methods inherited from Base

#close, #close!, #flush, #to_io, #unrecoverable_error?, #write

Constructor Details

#initialize(opts) ⇒ Kafka

Returns a new instance of Kafka.



15
16
17
18
19
20
21
22
23
24
# File 'lib/logstash-logger/device/kafka.rb', line 15

def initialize(opts)
  super
  host = opts[:host] || DEFAULT_HOST
  port = opts[:port] || DEFAULT_PORT
  @hosts = opts[:hosts] || host.split(',').map { |h| "#{h}:#{port}" }
  @topic = opts[:path] || DEFAULT_TOPIC
  @producer = opts[:producer] || DEFAULT_PRODUCER
  @backoff = opts[:backoff] || DEFAULT_BACKOFF
  @buffer_group = @topic
end

Instance Attribute Details

#backoffObject

Returns the value of attribute backoff.



13
14
15
# File 'lib/logstash-logger/device/kafka.rb', line 13

def backoff
  @backoff
end

#hostsObject

Returns the value of attribute hosts.



13
14
15
# File 'lib/logstash-logger/device/kafka.rb', line 13

def hosts
  @hosts
end

#producerObject

Returns the value of attribute producer.



13
14
15
# File 'lib/logstash-logger/device/kafka.rb', line 13

def producer
  @producer
end

#topicObject

Returns the value of attribute topic.



13
14
15
# File 'lib/logstash-logger/device/kafka.rb', line 13

def topic
  @topic
end

Instance Method Details

#connectObject



26
27
28
# File 'lib/logstash-logger/device/kafka.rb', line 26

def connect
  @io = ::Poseidon::Producer.new(@hosts, @producer)
end

#with_connectionObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/logstash-logger/device/kafka.rb', line 30

def with_connection
  connect unless connected?
  yield
rescue ::Poseidon::Errors::ChecksumError, Poseidon::Errors::UnableToFetchMetadata => e
  log_error(e)
  log_warning("reconnect/retry")
  sleep backoff if backoff
  reconnect
  retry
rescue => e
  log_error(e)
  log_warning("giving up")
  close(flush: false)
end

#write_batch(messages, topic = nil) ⇒ Object



45
46
47
48
49
50
# File 'lib/logstash-logger/device/kafka.rb', line 45

def write_batch(messages, topic = nil)
  topic ||= @topic
  with_connection do
    @io.send_messages messages.map { |message| Poseidon::MessageToSend.new(topic, message) }
  end
end

#write_one(message, topic = nil) ⇒ Object



52
53
54
# File 'lib/logstash-logger/device/kafka.rb', line 52

def write_one(message, topic = nil)
  write_batch([message], topic)
end