Class: LogStash::Inputs::S3L
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::S3L
- Includes:
- PluginMixins::AwsConfig::V2
- Defined in:
- lib/logstash/inputs/s3l.rb
Overview
Files ending in ‘.gz` are handled as gzip’ed files.
Defined Under Namespace
Modules: SinceDB
Instance Method Summary collapse
- #backup_to_bucket(object) ⇒ Object
- #backup_to_dir(filename) ⇒ Object
- #list_new_files ⇒ Object
- #process_files(queue) ⇒ Object
- #register ⇒ Object
- #run(queue) ⇒ Object
- #stop ⇒ Object
Instance Method Details
#backup_to_bucket(object) ⇒ Object
135 136 137 138 139 140 141 142 143 |
# File 'lib/logstash/inputs/s3l.rb', line 135 def backup_to_bucket(object) unless @backup_to_bucket.nil? backup_key = "#{@backup_add_prefix}#{object.key}" @backup_bucket.object(backup_key).copy_from(:copy_source => "#{object.bucket_name}/#{object.key}") if @delete object.delete() end end end |
#backup_to_dir(filename) ⇒ Object
146 147 148 149 150 |
# File 'lib/logstash/inputs/s3l.rb', line 146 def backup_to_dir(filename) unless @backup_to_dir.nil? FileUtils.cp(filename, @backup_to_dir) end end |
#list_new_files ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/logstash/inputs/s3l.rb', line 110 def list_new_files objects = {} found = false begin @s3bucket.objects(:prefix => @prefix).each do |log| found = true @logger.debug("S3 input: Found key", :key => log.key) if !ignore_filename?(log.key) if sincedb.newer?(log.last_modified) && log.content_length > 0 objects[log.key] = log.last_modified @logger.debug("S3 input: Adding to objects[]", :key => log.key) @logger.debug("objects[] length is: ", :length => objects.length) end else @logger.debug('S3 input: Ignoring', :key => log.key) end end @logger.info('S3 input: No files found in bucket', :prefix => prefix) unless found rescue Aws::Errors::ServiceError => e @logger.error("S3 input: Unable to list objects in bucket", :prefix => prefix, :message => e.) end objects.keys.sort {|a,b| objects[a] <=> objects[b]} end |
#process_files(queue) ⇒ Object
153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/logstash/inputs/s3l.rb', line 153 def process_files(queue) objects = list_new_files objects.each do |key| if stop? break else @logger.debug("S3 input processing", :bucket => @bucket, :key => key) process_log(queue, key) end end end |
#register ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/logstash/inputs/s3l.rb', line 74 def register require "fileutils" require "digest/md5" require "aws-sdk-resources" @logger.info("Registering s3 input", :bucket => @bucket, :region => @region) s3 = get_s3object @s3bucket = s3.bucket(@bucket) unless @backup_to_bucket.nil? @backup_bucket = s3.bucket(@backup_to_bucket) begin s3.client.head_bucket({ :bucket => @backup_to_bucket}) rescue Aws::S3::Errors::NoSuchBucket s3.create_bucket({ :bucket => @backup_to_bucket}) end end unless @backup_to_dir.nil? Dir.mkdir(@backup_to_dir, 0700) unless File.exists?(@backup_to_dir) end FileUtils.mkdir_p(@temporary_directory) unless Dir.exist?(@temporary_directory) end |
#run(queue) ⇒ Object
102 103 104 105 106 107 |
# File 'lib/logstash/inputs/s3l.rb', line 102 def run(queue) @current_thread = Thread.current Stud.interval(@interval) do process_files(queue) end end |
#stop ⇒ Object
167 168 169 170 171 172 |
# File 'lib/logstash/inputs/s3l.rb', line 167 def stop # @current_thread is initialized in the `#run` method, # this variable is needed because the `#stop` is a called in another thread # than the `#run` method and requiring us to call stop! with a explicit thread. Stud.stop!(@current_thread) end |