Class: LogStashLogger::Device::Kafka
- Inherits:
-
Connectable
- Object
- Base
- Connectable
- LogStashLogger::Device::Kafka
- Includes:
- Stud::Buffer
- Defined in:
- lib/logstash-logger/device/kafka.rb
Constant Summary collapse
- DEFAULT_HOST =
'localhost'
- DEFAULT_PORT =
9092
- DEFAULT_TOPIC =
'logstash'
- DEFAULT_PRODUCER =
'logstash-logger'
- DEFAULT_BACKOFF =
1
Instance Attribute Summary collapse
-
#backoff ⇒ Object
Returns the value of attribute backoff.
-
#hosts ⇒ Object
Returns the value of attribute hosts.
-
#producer ⇒ Object
Returns the value of attribute producer.
-
#topic ⇒ Object
Returns the value of attribute topic.
Attributes inherited from Base
Instance Method Summary collapse
- #close ⇒ Object
- #connect ⇒ Object
- #flush(*args) ⇒ Object
-
#initialize(opts) ⇒ Kafka
constructor
A new instance of Kafka.
- #reconnect ⇒ Object
- #with_connection ⇒ Object
- #write(message) ⇒ Object
Methods inherited from Connectable
Methods inherited from Base
Constructor Details
#initialize(opts) ⇒ Kafka
Returns a new instance of Kafka.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/logstash-logger/device/kafka.rb', line 17 def initialize(opts) super host = opts[:host] || DEFAULT_HOST port = opts[:port] || DEFAULT_PORT @hosts = opts[:hosts] || host.split(',').map { |h| "#{h}:#{port}" } @topic = opts[:path] || DEFAULT_TOPIC @producer = opts[:producer] || DEFAULT_PRODUCER @backoff = opts[:backoff] || DEFAULT_BACKOFF @batch_events = opts.fetch(:batch_events, 50) @batch_timeout = opts.fetch(:batch_timeout, 5) buffer_initialize max_items: @batch_events, max_interval: @batch_timeout end |
Instance Attribute Details
#backoff ⇒ Object
Returns the value of attribute backoff.
15 16 17 |
# File 'lib/logstash-logger/device/kafka.rb', line 15 def backoff @backoff end |
#hosts ⇒ Object
Returns the value of attribute hosts.
15 16 17 |
# File 'lib/logstash-logger/device/kafka.rb', line 15 def hosts @hosts end |
#producer ⇒ Object
Returns the value of attribute producer.
15 16 17 |
# File 'lib/logstash-logger/device/kafka.rb', line 15 def producer @producer end |
#topic ⇒ Object
Returns the value of attribute topic.
15 16 17 |
# File 'lib/logstash-logger/device/kafka.rb', line 15 def topic @topic end |
Instance Method Details
#close ⇒ Object
59 60 61 62 63 64 65 66 |
# File 'lib/logstash-logger/device/kafka.rb', line 59 def close buffer_flush(final: true) @io && @io.close rescue => e warn "#{self.class} - #{e.class} - #{e.}" ensure @io = nil end |
#connect ⇒ Object
32 33 34 |
# File 'lib/logstash-logger/device/kafka.rb', line 32 def connect @io = ::Poseidon::Producer.new(@hosts, @producer) end |
#flush(*args) ⇒ Object
68 69 70 71 72 73 74 75 76 77 |
# File 'lib/logstash-logger/device/kafka.rb', line 68 def flush(*args) if args.empty? buffer_flush else = *args.first with_connection do @io. end end end |
#reconnect ⇒ Object
36 37 38 39 |
# File 'lib/logstash-logger/device/kafka.rb', line 36 def reconnect @io.close connect end |
#with_connection ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/logstash-logger/device/kafka.rb', line 41 def with_connection connect unless @io yield rescue ::Poseidon::Errors::ChecksumError, Poseidon::Errors::UnableToFetchMetadata => e warn "#{self.class} - #{e.class} -> reconnect/retry" sleep backoff if backoff reconnect retry rescue => e warn "#{self.class} - #{e.class} - #{e.} -> giving up" @io = nil end |
#write(message) ⇒ Object
54 55 56 57 |
# File 'lib/logstash-logger/device/kafka.rb', line 54 def write() buffer_receive Poseidon::MessageToSend.new(@topic, ) buffer_flush(force: true) if @sync end |