Class: S3Downloader
- Inherits:
-
Object
- Object
- S3Downloader
- Defined in:
- lib/logstash/inputs/s3/downloader.rb
Instance Method Summary collapse
- #cleanup_local_object(record) ⇒ Object
- #cleanup_s3object(record) ⇒ Object
- #copy_s3object_to_disk(record) ⇒ Object
-
#initialize(logger, stop_semaphore, options) ⇒ S3Downloader
constructor
A new instance of S3Downloader.
- #move_s3object(record) ⇒ Object
- #stop? ⇒ Boolean
Constructor Details
#initialize(logger, stop_semaphore, options) ⇒ S3Downloader
Returns a new instance of S3Downloader.
7 8 9 10 11 12 13 14 |
# File 'lib/logstash/inputs/s3/downloader.rb', line 7 def initialize(logger, stop_semaphore, ) @logger = logger @stopped = stop_semaphore @factory = [:s3_client_factory] @delete_on_success = [:delete_on_success] @move_to_bucket = [:move_to_bucket] @include_object_properties = [:include_object_properties] end |
Instance Method Details
#cleanup_local_object(record) ⇒ Object
37 38 39 40 41 |
# File 'lib/logstash/inputs/s3/downloader.rb', line 37 def cleanup_local_object(record) FileUtils.remove_entry_secure(record[:local_file], true) if ::File.exists?(record[:local_file]) rescue Exception => e @logger.warn("Could not delete file", :file => record[:local_file], :error => e) end |
#cleanup_s3object(record) ⇒ Object
43 44 45 46 47 48 49 50 51 52 |
# File 'lib/logstash/inputs/s3/downloader.rb', line 43 def cleanup_s3object(record) return unless @delete_on_success || @move_to_bucket begin @factory.get_s3_client(record[:bucket]) do |s3| s3.delete_object(bucket: record[:bucket], key: record[:key]) end rescue Exception => e @logger.warn("Failed to delete s3 object", :record => record, :error => e) end end |
#copy_s3object_to_disk(record) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/logstash/inputs/s3/downloader.rb', line 16 def copy_s3object_to_disk(record) # (from docs) WARNING: # yielding data to a block disables retries of networking errors! begin @factory.get_s3_client(record[:bucket]) do |s3| response = s3.get_object( bucket: record[:bucket], key: record[:key], response_target: record[:local_file] ) record[:s3_data] = response.to_h.keep_if { |key| @include_object_properties.include?(key) } end rescue Aws::S3::Errors::ServiceError => e @logger.error("Unable to download file. Requeuing the message", :error => e, :record => record) # prevent sqs message deletion throw :skip_delete end throw :skip_delete if stop? return true end |
#move_s3object(record) ⇒ Object
54 55 56 57 58 59 60 61 62 63 |
# File 'lib/logstash/inputs/s3/downloader.rb', line 54 def move_s3object(record) return unless @move_to_bucket begin @factory.get_s3_client(@move_to_bucket) do |s3| s3.copy_object(bucket: @move_to_bucket, copy_source: "/#{record[:bucket]}/#{record[:key]}", key: record[:key]) end rescue Exception => e @logger.warn("Failed to move s3 object", :record => record, :error => e) end end |
#stop? ⇒ Boolean
65 66 67 |
# File 'lib/logstash/inputs/s3/downloader.rb', line 65 def stop? @stopped.value end |