Class: LogStash::Outputs::S3

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::AwsConfig
Defined in:
lib/logstash/outputs/s3.rb

Overview

INFORMATION:

This plugin batches and uploads logstash events into Amazon Simple Storage Service (Amazon S3).

Requirements:

  • Amazon S3 Bucket and S3 Access Permissions (Typically access_key_id and secret_access_key)

  • S3 PutObject permission

  • Run logstash as superuser to establish connection

S3 outputs create temporary files into “/opt/logstash/S3_temp/”. If you want, you can change the path at the start of register method.

S3 output files have the following format

ls.s3.ip-10-228-27-95.2013-04-18T10.00.tag_hello.part0.txt

ls.s3 : indicate logstash plugin s3

“ip-10-228-27-95” : indicates the ip of your machine. “2013-04-18T10.00” : represents the time whenever you specify time_file. “tag_hello” : this indicates the event’s tag. “part0” : this means if you indicate size_file then it will generate more parts if you file.size > size_file.

When a file is full it will be pushed to the bucket and then deleted from the temporary directory.
If a file is empty, it is simply deleted.  Empty files will not be pushed

Crash Recovery:

  • This plugin will recover and upload temporary log files after crash/abnormal termination

Note regarding time_file and size_file

:

Both time_file and size_file settings can trigger a log “file rotation” A log rotation pushes the current log “part” to s3 and deleted from local temporary storage.

If you specify BOTH size_file and time_file then it will create file for each tag (if specified). When EITHER time_file minutes have elapsed OR log file size > size_file, a log rotation is triggered.

If you ONLY specify time_file but NOT file_size, one file for each tag (if specified) will be created.. When time_file minutes elapses, a log rotation will be triggered.

If you ONLY specify size_file, but NOT time_file, one files for each tag (if specified) will be created. When size of log file part > size_file, a log rotation will be triggered.

If NEITHER size_file nor time_file is specified, ONLY one file for each tag (if specified) will be created. WARNING: Since no log rotation is triggered, S3 Upload will only occur when logstash restarts.

#### Usage: This is an example of logstash config:

source,ruby

output {

s3{
  access_key_id => "crazy_key"             (required)
  secret_access_key => "monkey_access_key" (required)
  endpoint_region => "eu-west-1"           (required) - Deprecated
  bucket => "boss_please_open_your_bucket" (required)
  size_file => 2048                        (optional) - Bytes
  time_file => 5                           (optional) - Minutes
  format => "plain"                        (optional)
  canned_acl => "private"                  (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" )
}

Constant Summary collapse

TEMPFILE_EXTENSION =
"txt"
S3_INVALID_CHARACTERS =
/[\^`><]/

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#page_counterObject (readonly)

Returns the value of attribute page_counter.



140
141
142
# File 'lib/logstash/outputs/s3.rb', line 140

def page_counter
  @page_counter
end

#s3Object (readonly)

Returns the value of attribute s3.



141
142
143
# File 'lib/logstash/outputs/s3.rb', line 141

def s3
  @s3
end

#tempfileObject

Exposed attributes for testing purpose.



139
140
141
# File 'lib/logstash/outputs/s3.rb', line 139

def tempfile
  @tempfile
end

Instance Method Details

#aws_s3_configObject



143
144
145
146
# File 'lib/logstash/outputs/s3.rb', line 143

def aws_s3_config
  @logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @region)
  @s3 = AWS::S3.new(full_options)
end

#aws_service_endpoint(region) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/logstash/outputs/s3.rb', line 160

def aws_service_endpoint(region)
  # Make the deprecated endpoint_region work
  # TODO: (ph) Remove this after deprecation.

  if @endpoint_region
    region_to_use = @endpoint_region
  else
    region_to_use = @region
  end

  return {
    :s3_endpoint => region_to_use == 'us-east-1' ? 's3.amazonaws.com' : "s3-#{region_to_use}.amazonaws.com"
  }
end

#closeObject



349
350
351
352
353
354
355
356
# File 'lib/logstash/outputs/s3.rb', line 349

def close
  shutdown_upload_workers
  @periodic_rotation_thread.stop! if @periodic_rotation_thread

  @file_rotation_lock.synchronize do
    @tempfile.close unless @tempfile.nil? && @tempfile.closed?
  end
end

#create_temporary_fileObject



200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/logstash/outputs/s3.rb', line 200

def create_temporary_file
  filename = File.join(@temporary_directory, get_temporary_filename(@page_counter))

  @logger.debug("S3: Creating a new temporary file", :filename => filename)

  @file_rotation_lock.synchronize do
    unless @tempfile.nil?
      @tempfile.close
    end

    @tempfile = File.open(filename, "a")
  end
end

#full_optionsObject



148
149
150
# File 'lib/logstash/outputs/s3.rb', line 148

def full_options
  aws_options_hash.merge(signature_options)
end

#get_temporary_filename(page_counter = 0) ⇒ Object



305
306
307
308
309
310
311
312
313
314
# File 'lib/logstash/outputs/s3.rb', line 305

def get_temporary_filename(page_counter = 0)
  current_time = Time.now
  filename = "ls.s3.#{Socket.gethostname}.#{current_time.strftime("%Y-%m-%dT%H.%M")}"

  if @tags.size > 0
    return "#{filename}.tag_#{@tags.join('.')}.part#{page_counter}.#{TEMPFILE_EXTENSION}"
  else
    return "#{filename}.part#{page_counter}.#{TEMPFILE_EXTENSION}"
  end
end

#move_file_to_bucket(file) ⇒ Object



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/logstash/outputs/s3.rb', line 283

def move_file_to_bucket(file)
  if !File.zero?(file)
    write_on_bucket(file)
    @logger.debug("S3: File was put on the upload thread", :filename => File.basename(file), :bucket => @bucket)
  end

  begin
    File.delete(file)
  rescue Errno::ENOENT
    # Something else deleted the file, logging but not raising the issue
    @logger.warn("S3: Cannot delete the temporary file since it doesn't exist on disk", :filename => File.basename(file))
  rescue Errno::EACCES
    @logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory)
  end
end

#periodic_intervalObject



300
301
302
# File 'lib/logstash/outputs/s3.rb', line 300

def periodic_interval
  @time_file * 60
end

#receive(event) ⇒ Object



317
318
319
320
# File 'lib/logstash/outputs/s3.rb', line 317

def receive(event)

  @codec.encode(event)
end

#registerObject



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/logstash/outputs/s3.rb', line 215

def register
  require "aws-sdk"
  # required if using ruby version < 2.0
  # http://ruby.awsblog.com/post/Tx16QY1CI5GVBFT/Threading-with-the-AWS-SDK-for-Ruby
  AWS.eager_autoload!(AWS::S3)

  workers_not_supported

  @s3 = aws_s3_config
  @upload_queue = Queue.new
  @file_rotation_lock = Mutex.new

  if @prefix && @prefix =~ S3_INVALID_CHARACTERS
    @logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS)
    raise LogStash::ConfigurationError, "S3: prefix contains invalid characters"
  end

  if !Dir.exist?(@temporary_directory)
    FileUtils.mkdir_p(@temporary_directory)
  end

  test_s3_write

  restore_from_crashes if @restore == true
  reset_page_counter
  create_temporary_file
  configure_periodic_rotation if time_file != 0
  configure_upload_workers

  @codec.on_event do |event, encoded_event|
    handle_event(encoded_event)
  end
end

#restoreObject

IMPORTANT: if you use multiple instance of s3, you should specify on one of them the “restore=> true” and on the others “restore => false”. This is hack for not destroy the new files after restoring the initial files. If you do not specify “restore => true” when logstash crashes or is restarted, the files are not sent into the bucket, for example if you have single Instance.



105
# File 'lib/logstash/outputs/s3.rb', line 105

config :restore, :validate => :boolean, :default => false

#restore_from_crashesObject



272
273
274
275
276
277
278
279
280
# File 'lib/logstash/outputs/s3.rb', line 272

def restore_from_crashes
  @logger.debug("S3: Checking for temp files from a previoius crash...")

  Dir[File.join(@temporary_directory, "*.#{TEMPFILE_EXTENSION}")].each do |file|
    name_file = File.basename(file)
    @logger.warn("S3: Found temporary file from crash.  Uploading file to S3.", :filename => name_file)
    move_file_to_bucket_async(file)
  end
end

#rotate_events_log?Boolean

Returns:

  • (Boolean)


323
324
325
326
327
# File 'lib/logstash/outputs/s3.rb', line 323

def rotate_events_log?
  @file_rotation_lock.synchronize do
    @tempfile.size > @size_file
  end
end

#signature_optionsObject



152
153
154
155
156
157
158
# File 'lib/logstash/outputs/s3.rb', line 152

def signature_options
  if @signature_version
    {:s3_signature_version => @signature_version}
  else
    {}
  end
end

#test_s3_writeObject



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/logstash/outputs/s3.rb', line 253

def test_s3_write
  @logger.debug("S3: Creating a test file on S3")

  test_filename = File.join(@temporary_directory,
                            "logstash-programmatic-access-test-object-#{Time.now.to_i}")

  File.open(test_filename, 'a') do |file|
    file.write('test')
  end

  begin
    write_on_bucket(test_filename)
    delete_on_bucket(test_filename)
  ensure
    File.delete(test_filename)
  end
end

#write_events_to_multiple_files?Boolean

Returns:

  • (Boolean)


330
331
332
# File 'lib/logstash/outputs/s3.rb', line 330

def write_events_to_multiple_files?
  @size_file > 0
end

#write_on_bucket(file) ⇒ Object



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/logstash/outputs/s3.rb', line 176

def write_on_bucket(file)
  # find and use the bucket
  bucket = @s3.buckets[@bucket]

  remote_filename = "#{@prefix}#{File.basename(file)}"

  @logger.debug("S3: ready to write file in bucket", :remote_filename => remote_filename, :bucket => @bucket)

  File.open(file, 'r') do |fileIO|
    begin
      # prepare for write the file
      object = bucket.objects[remote_filename]
      object.write(fileIO, :acl => @canned_acl, :server_side_encryption => @server_side_encryption ? :aes256 : nil)
    rescue AWS::Errors::Base => error
      @logger.error("S3: AWS error", :error => error)
      raise LogStash::Error, "AWS Configuration Error, #{error}"
    end
  end

  @logger.debug("S3: has written remote file in bucket with canned ACL", :remote_filename => remote_filename, :bucket  => @bucket, :canned_acl => @canned_acl)
end

#write_to_tempfile(event) ⇒ Object



335
336
337
338
339
340
341
342
343
344
345
346
# File 'lib/logstash/outputs/s3.rb', line 335

def write_to_tempfile(event)
  begin
    @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile))

    @file_rotation_lock.synchronize do
      @tempfile.syswrite(event)
    end
  rescue Errno::ENOSPC
    @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory)
    close
  end
end