Class: LogStash::Outputs::CloudWatchLogs

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::AwsConfig::V2
Defined in:
lib/logstash/outputs/cloudwatchlogs.rb

Overview

This output lets you send log data to AWS CloudWatch Logs service

Defined Under Namespace

Classes: Buffer

Constant Summary collapse

LOG_GROUP_NAME =

Constants

"log_group_name"
LOG_STREAM_NAME =
"log_stream_name"
SEQUENCE_TOKEN =
"sequence_token"
TIMESTAMP =
"@timestamp"
MESSAGE =
"message"
PER_EVENT_OVERHEAD =
26
MAX_BATCH_SIZE =
1024 * 1024
MAX_BATCH_COUNT =
10000
MAX_DISTANCE_BETWEEN_EVENTS =
86400 * 1000
MIN_DELAY =
0.2
MIN_BUFFER_DURATION =
5000
MAX_BACKOFF_IN_SECOND =

Backoff up to 64 seconds upon failure

64

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#bufferObject (readonly)

Only accessed by tests



79
80
81
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 79

def buffer
  @buffer
end

#cwlObject

Returns the value of attribute cwl.



76
77
78
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 76

def cwl
  @cwl
end

#last_flushObject

Returns the value of attribute last_flush.



76
77
78
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 76

def last_flush
  @last_flush
end

#sequence_tokenObject

Returns the value of attribute sequence_token.



76
77
78
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 76

def sequence_token
  @sequence_token
end

Instance Method Details

#flush(events) ⇒ Object



161
162
163
164
165
166
167
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 161

def flush(events)
  return if events.nil? or events.empty?
  log_event_batches = prepare_log_events(events)
  log_event_batches.each do |log_events|
    put_log_events(log_events)
  end
end

#receive(event) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 132

def receive(event)
  return unless output?(event)

  if event == LogStash::SHUTDOWN
    @buffer.close
    @publisher.join
    @logger.info("CloudWatch Logs output plugin shutdown.")
    finished
    return
  end
  return if invalid?(event)

  if @use_codec
    @codec.encode(event)
  else
    @buffer.enq({:timestamp => event.timestamp.time.to_f*1000, :message => event[MESSAGE] })
  end
end

#registerObject



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 82

def register
  require "aws-sdk"
  @cwl = Aws::CloudWatchLogs::Client.new(aws_options_hash)

  if @batch_count > MAX_BATCH_COUNT
    @logger.warn(":batch_count exceeds the max number of log events. Use #{MAX_BATCH_COUNT} instead.")
    @batch_count = MAX_BATCH_COUNT
  end
  if @batch_size > MAX_BATCH_SIZE
    @logger.warn(":batch_size exceeds the max size of log events. Use #{MAX_BATCH_SIZE} instead.")
    @batch_size = MAX_BATCH_SIZE
  end
  if @buffer_duration < MIN_BUFFER_DURATION
    @logger.warn(":buffer_duration is smaller than the min value. Use #{MIN_BUFFER_DURATION} instead.")
    @buffer_duration = MIN_BUFFER_DURATION
  end
  @sequence_token = nil
  @last_flush = Time.now.to_f
  @buffer = Buffer.new(
    max_batch_count: batch_count, max_batch_size: batch_size,
    buffer_duration: @buffer_duration, out_queue_size: @queue_size, logger: @logger,
    size_of_item_proc: Proc.new {|event| event[:message].bytesize + PER_EVENT_OVERHEAD})
  @publisher = Thread.new do
    @buffer.deq do |batch|
      flush(batch)
    end
  end

  if @log_stream_name.include? "%instance_id%"
    require "net/http"
    @log_stream_name.gsub!("%instance_id%", Net::HTTP.get(URI.parse("http://169.254.169.254/latest/meta-data/instance-id")))
  end

  if @log_stream_name.include? "%hostname%"
    require "socket"
    @log_stream_name.gsub!("%hostname%", Socket.gethostname)
  end

  if @log_stream_name.include? "%ipv4%"
    require "socket"
    @log_stream_name.gsub!("%ipv4%", Socket.ip_address_list.find { |ai| ai.ipv4? && !ai.ipv4_loopback? }.ip_address)
  end

  if @use_codec
    @codec.on_event() {|event, payload| @buffer.enq({:timestamp => event.timestamp.time.to_f*1000,
      :message => payload})}
  end
end

#teardownObject



152
153
154
155
156
157
158
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 152

def teardown
  @logger.info("Going to clean up resources")
  @buffer.close
  @publisher.join
  @cwl = nil
  finished
end