Class: LogStashLogger::Device::AwsStream
Constant Summary
collapse
- DEFAULT_STREAM =
'logstash'
Class Attribute Summary collapse
Instance Attribute Summary collapse
Attributes inherited from Connectable
#buffer_logger
Attributes inherited from Base
#error_logger, #io, #sync
Instance Method Summary
collapse
Methods inherited from Connectable
#close, #connected?, #flush, #on_full_buffer_receive, #reconnect, #reset, #to_io, #write
Methods included from Buffer
#buffer_flush, #buffer_full?, #buffer_initialize, #buffer_receive, #reset_buffer
Methods inherited from Base
#close, #flush, #reset, #to_io, #unrecoverable_error?, #write
Constructor Details
#initialize(opts) ⇒ AwsStream
Returns a new instance of AwsStream.
22
23
24
25
26
27
28
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 22
def initialize(opts)
super
@access_key_id = opts[:aws_access_key_id]
@secret_access_key = opts[:aws_secret_access_key]
@aws_region = opts[:aws_region]
@stream = opts[:stream] || DEFAULT_STREAM
end
|
Class Attribute Details
.recoverable_error_codes ⇒ Object
Returns the value of attribute recoverable_error_codes.
17
18
19
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 17
def recoverable_error_codes
@recoverable_error_codes
end
|
.stream_class ⇒ Object
Returns the value of attribute stream_class.
17
18
19
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 17
def stream_class
@stream_class
end
|
Instance Attribute Details
#aws_region ⇒ Object
Returns the value of attribute aws_region.
20
21
22
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 20
def aws_region
@aws_region
end
|
#stream ⇒ Object
Returns the value of attribute stream.
20
21
22
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 20
def stream
@stream
end
|
Instance Method Details
#close! ⇒ Object
87
88
89
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 87
def close!
@io = nil
end
|
#connect ⇒ Object
46
47
48
49
50
51
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 46
def connect
client_opts = {}
client_opts[:credentials] = Aws::Credentials.new(@access_key_id, @secret_access_key) unless @access_key_id == nil || @secret_access_key == nil
client_opts[:region] = @aws_region unless @aws_region == nil
@io = self.class.stream_class.new(client_opts)
end
|
#get_response_records(resp) ⇒ Object
42
43
44
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 42
def get_response_records(resp)
fail NotImplementedError
end
|
#is_successful_response(resp) ⇒ Object
38
39
40
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 38
def is_successful_response(resp)
fail NotImplementedError
end
|
#put_records(records) ⇒ Object
34
35
36
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 34
def put_records(records)
fail NotImplementedError
end
|
30
31
32
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 30
def transform_message(message)
fail NotImplementedError
end
|
#with_connection ⇒ Object
53
54
55
56
57
58
59
60
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 53
def with_connection
connect unless connected?
yield
rescue => e
log_error(e)
log_warning("giving up")
close(flush: false)
end
|
#write_batch(messages, group = nil) ⇒ Object
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 62
def write_batch(messages, group = nil)
records = messages.map{ |m| transform_message(m) }
with_connection do
resp = put_records(records)
if !is_successful_response(resp)
get_response_records(resp).each_with_index do |record, index|
if self.class.recoverable_error_codes.include?(record.error_code)
log_warning("Failed to post record using #{self.class.stream_class.name} with error: #{record.error_code} #{record.error_message}")
log_warning("Retrying")
write(records[index][:data])
elsif !record.error_code.nil? && record.error_code != ''
log_error("Failed to post record using #{self.class.stream_class.name} with error: #{record.error_code} #{record.error_message}")
end
end
end
end
end
|
#write_one(message) ⇒ Object
83
84
85
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 83
def write_one(message)
write_batch([message])
end
|