Class: S3Downloader

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/inputs/s3/downloader.rb

Instance Method Summary collapse

Constructor Details

#initialize(logger, stop_semaphore, options) ⇒ S3Downloader

Returns a new instance of S3Downloader.



7
8
9
10
11
12
13
14
# File 'lib/logstash/inputs/s3/downloader.rb', line 7

def initialize(logger, stop_semaphore, options)
  @logger = logger
  @stopped = stop_semaphore
  @factory = options[:s3_client_factory]
  @delete_on_success = options[:delete_on_success]
  @move_to_bucket = options[:move_to_bucket]
  @include_object_properties = options[:include_object_properties]
end

Instance Method Details

#cleanup_local_object(record) ⇒ Object



37
38
39
40
41
# File 'lib/logstash/inputs/s3/downloader.rb', line 37

def cleanup_local_object(record)
  FileUtils.remove_entry_secure(record[:local_file], true) if ::File.exists?(record[:local_file])
rescue Exception => e
  @logger.warn("Could not delete file", :file => record[:local_file], :error => e)
end

#cleanup_s3object(record) ⇒ Object



43
44
45
46
47
48
49
50
51
52
# File 'lib/logstash/inputs/s3/downloader.rb', line 43

def cleanup_s3object(record)
  return unless @delete_on_success || @move_to_bucket
  begin
    @factory.get_s3_client(record[:bucket]) do |s3|
      s3.delete_object(bucket: record[:bucket], key: record[:key])
    end
  rescue Exception => e
    @logger.warn("Failed to delete s3 object", :record => record, :error => e)
  end
end

#copy_s3object_to_disk(record) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/logstash/inputs/s3/downloader.rb', line 16

def copy_s3object_to_disk(record)
  # (from docs) WARNING:
  # yielding data to a block disables retries of networking errors!
  begin
    @factory.get_s3_client(record[:bucket]) do |s3|
      response = s3.get_object(
        bucket: record[:bucket],
        key: record[:key],
        response_target: record[:local_file]
      )
      record[:s3_data] = response.to_h.keep_if { |key| @include_object_properties.include?(key) }
    end
  rescue Aws::S3::Errors::ServiceError => e
    @logger.error("Unable to download file. Requeuing the message", :error => e, :record => record)
    # prevent sqs message deletion
    throw :skip_delete
  end
  throw :skip_delete if stop?
  return true
end

#move_s3object(record) ⇒ Object



54
55
56
57
58
59
60
61
62
63
# File 'lib/logstash/inputs/s3/downloader.rb', line 54

def move_s3object(record)
  return unless @move_to_bucket
  begin
    @factory.get_s3_client(@move_to_bucket) do |s3|
      s3.copy_object(bucket: @move_to_bucket, copy_source: "/#{record[:bucket]}/#{record[:key]}", key: record[:key])
    end
  rescue Exception => e
    @logger.warn("Failed to move s3 object", :record => record, :error => e)
  end
end

#stop?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/logstash/inputs/s3/downloader.rb', line 65

def stop?
  @stopped.value
end