Class: LogStashLogger::Device::AwsStream

Inherits:
Connectable show all
Defined in:
lib/logstash-logger/device/aws_stream.rb

Direct Known Subclasses

Firehose, Kinesis

Constant Summary collapse

DEFAULT_STREAM =
'logstash'

Class Attribute Summary collapse

Instance Attribute Summary collapse

Attributes inherited from Connectable

#buffer_logger

Attributes inherited from Base

#error_logger, #io, #sync

Instance Method Summary collapse

Methods inherited from Connectable

#close, #connected?, #flush, #on_full_buffer_receive, #reconnect, #reset, #to_io, #write

Methods included from Buffer

#buffer_flush, #buffer_full?, #buffer_initialize, #buffer_receive, #reset_buffer

Methods inherited from Base

#close, #flush, #reset, #to_io, #unrecoverable_error?, #write

Constructor Details

#initialize(opts) ⇒ AwsStream

Returns a new instance of AwsStream.



22
23
24
25
26
27
28
# File 'lib/logstash-logger/device/aws_stream.rb', line 22

def initialize(opts)
  super
  @access_key_id = opts[:aws_access_key_id]
  @secret_access_key = opts[:aws_secret_access_key]
  @aws_region = opts[:aws_region]
  @stream = opts[:stream] || DEFAULT_STREAM
end

Class Attribute Details

.recoverable_error_codesObject

Returns the value of attribute recoverable_error_codes.



17
18
19
# File 'lib/logstash-logger/device/aws_stream.rb', line 17

def recoverable_error_codes
  @recoverable_error_codes
end

.stream_classObject

Returns the value of attribute stream_class.



17
18
19
# File 'lib/logstash-logger/device/aws_stream.rb', line 17

def stream_class
  @stream_class
end

Instance Attribute Details

#aws_regionObject

Returns the value of attribute aws_region.



20
21
22
# File 'lib/logstash-logger/device/aws_stream.rb', line 20

def aws_region
  @aws_region
end

#streamObject

Returns the value of attribute stream.



20
21
22
# File 'lib/logstash-logger/device/aws_stream.rb', line 20

def stream
  @stream
end

Instance Method Details

#close!Object



87
88
89
# File 'lib/logstash-logger/device/aws_stream.rb', line 87

def close!
  @io = nil
end

#connectObject



46
47
48
49
50
51
# File 'lib/logstash-logger/device/aws_stream.rb', line 46

def connect
  client_opts = {}
  client_opts[:credentials] = Aws::Credentials.new(@access_key_id, @secret_access_key) unless @access_key_id == nil || @secret_access_key == nil
  client_opts[:region] = @aws_region unless @aws_region == nil
  @io = self.class.stream_class.new(client_opts)
end

#get_response_records(resp) ⇒ Object



42
43
44
# File 'lib/logstash-logger/device/aws_stream.rb', line 42

def get_response_records(resp)
  fail NotImplementedError
end

#is_successful_response(resp) ⇒ Object



38
39
40
# File 'lib/logstash-logger/device/aws_stream.rb', line 38

def is_successful_response(resp)
  fail NotImplementedError
end

#put_records(records) ⇒ Object



34
35
36
# File 'lib/logstash-logger/device/aws_stream.rb', line 34

def put_records(records)
  fail NotImplementedError
end

#transform_message(message) ⇒ Object



30
31
32
# File 'lib/logstash-logger/device/aws_stream.rb', line 30

def transform_message(message)
  fail NotImplementedError
end

#with_connectionObject



53
54
55
56
57
58
59
60
# File 'lib/logstash-logger/device/aws_stream.rb', line 53

def with_connection
  connect unless connected?
  yield
rescue => e
  log_error(e)
  log_warning("giving up")
  close(flush: false)
end

#write_batch(messages, group = nil) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/logstash-logger/device/aws_stream.rb', line 62

def write_batch(messages, group = nil)
  records = messages.map{ |m| transform_message(m) }

  with_connection do
    resp = put_records(records)

    # Put any failed records back into the buffer
    if !is_successful_response(resp)
      get_response_records(resp).each_with_index do |record, index|
        if self.class.recoverable_error_codes.include?(record.error_code)
          log_warning("Failed to post record using #{self.class.stream_class.name} with error: #{record.error_code} #{record.error_message}")
          log_warning("Retrying")
          write(records[index][:data])
        elsif !record.error_code.nil? && record.error_code != ''
          log_error("Failed to post record using #{self.class.stream_class.name} with error: #{record.error_code} #{record.error_message}")
        end
      end
    end
  end
end

#write_one(message) ⇒ Object



83
84
85
# File 'lib/logstash-logger/device/aws_stream.rb', line 83

def write_one(message)
  write_batch([message])
end