Class: LogStashLogger::Device::Kafka
- Inherits:
-
Connectable
- Object
- Base
- Connectable
- LogStashLogger::Device::Kafka
- Defined in:
- lib/logstash-logger/device/kafka.rb
Constant Summary collapse
- DEFAULT_HOST =
'localhost'
- DEFAULT_PORT =
9092
- DEFAULT_TOPIC =
'logstash'
- DEFAULT_PRODUCER =
'logstash-logger'
- DEFAULT_BACKOFF =
1
Instance Attribute Summary collapse
-
#backoff ⇒ Object
Returns the value of attribute backoff.
-
#hosts ⇒ Object
Returns the value of attribute hosts.
-
#producer ⇒ Object
Returns the value of attribute producer.
-
#topic ⇒ Object
Returns the value of attribute topic.
Attributes inherited from Connectable
Attributes inherited from Base
Instance Method Summary collapse
- #connect ⇒ Object
-
#initialize(opts) ⇒ Kafka
constructor
A new instance of Kafka.
- #with_connection ⇒ Object
- #write_batch(messages, topic = nil) ⇒ Object
- #write_one(message, topic = nil) ⇒ Object
Methods inherited from Connectable
#close, #connected?, #flush, #on_full_buffer_receive, #reconnect, #to_io, #write
Methods included from Buffer
#buffer_flush, #buffer_full?, #buffer_initialize, #buffer_receive, #reset_buffer
Methods inherited from Base
#close, #close!, #flush, #to_io, #unrecoverable_error?, #write
Constructor Details
#initialize(opts) ⇒ Kafka
Returns a new instance of Kafka.
15 16 17 18 19 20 21 22 23 24 |
# File 'lib/logstash-logger/device/kafka.rb', line 15 def initialize(opts) super host = opts[:host] || DEFAULT_HOST port = opts[:port] || DEFAULT_PORT @hosts = opts[:hosts] || host.split(',').map { |h| "#{h}:#{port}" } @topic = opts[:path] || DEFAULT_TOPIC @producer = opts[:producer] || DEFAULT_PRODUCER @backoff = opts[:backoff] || DEFAULT_BACKOFF @buffer_group = @topic end |
Instance Attribute Details
#backoff ⇒ Object
Returns the value of attribute backoff.
13 14 15 |
# File 'lib/logstash-logger/device/kafka.rb', line 13 def backoff @backoff end |
#hosts ⇒ Object
Returns the value of attribute hosts.
13 14 15 |
# File 'lib/logstash-logger/device/kafka.rb', line 13 def hosts @hosts end |
#producer ⇒ Object
Returns the value of attribute producer.
13 14 15 |
# File 'lib/logstash-logger/device/kafka.rb', line 13 def producer @producer end |
#topic ⇒ Object
Returns the value of attribute topic.
13 14 15 |
# File 'lib/logstash-logger/device/kafka.rb', line 13 def topic @topic end |
Instance Method Details
#connect ⇒ Object
26 27 28 |
# File 'lib/logstash-logger/device/kafka.rb', line 26 def connect @io = ::Poseidon::Producer.new(@hosts, @producer) end |
#with_connection ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/logstash-logger/device/kafka.rb', line 30 def with_connection connect unless connected? yield rescue ::Poseidon::Errors::ChecksumError, Poseidon::Errors::UnableToFetchMetadata => e log_error(e) log_warning("reconnect/retry") sleep backoff if backoff reconnect retry rescue => e log_error(e) log_warning("giving up") close(flush: false) end |
#write_batch(messages, topic = nil) ⇒ Object
45 46 47 48 49 50 |
# File 'lib/logstash-logger/device/kafka.rb', line 45 def write_batch(, topic = nil) topic ||= @topic with_connection do @io. .map { || Poseidon::MessageToSend.new(topic, ) } end end |
#write_one(message, topic = nil) ⇒ Object
52 53 54 |
# File 'lib/logstash-logger/device/kafka.rb', line 52 def write_one(, topic = nil) write_batch([], topic) end |