Class: LogStash::Inputs::S3
- Defined in:
- lib/logstash/inputs/s3.rb
Overview
Stream events from files from a S3 bucket.
Each line from each file generates an event. Files ending in ‘.gz’ are handled as gzip’ed files.
Constant Summary
Constants included from Config::Mixin
Instance Attribute Summary
Attributes inherited from Base
Attributes included from Config::Mixin
Attributes inherited from Plugin
Instance Method Summary collapse
Methods inherited from Base
Methods included from Config::Mixin
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #teardown, #terminating?, #to_s
Constructor Details
This class inherits a constructor from LogStash::Inputs::Base
Instance Method Details
#register ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/logstash/inputs/s3.rb', line 63 def register require "digest/md5" require "aws-sdk" @region_endpoint = @region if !@region.empty? @logger.info("Registering s3 input", :bucket => @bucket, :region_endpoint => @region_endpoint) if @credentials.nil? @access_key_id = ENV['AWS_ACCESS_KEY_ID'] @secret_access_key = ENV['AWS_SECRET_ACCESS_KEY'] elsif @credentials.is_a? Array if @credentials.length ==1 File.open(@credentials[0]) { |f| f.each do |line| unless (/^\#/.match(line)) if(/\s*=\s*/.match(line)) param, value = line.split('=', 2) param = param.chomp().strip() value = value.chomp().strip() if param.eql?('AWS_ACCESS_KEY_ID') @access_key_id = value elsif param.eql?('AWS_SECRET_ACCESS_KEY') @secret_access_key = value end end end end } elsif @credentials.length == 2 @access_key_id = @credentials[0] @secret_access_key = @credentials[1] else raise ArgumentError.new('Credentials must be of the form "/path/to/file" or ["id", "secret"]') end end if @access_key_id.nil? or @secret_access_key.nil? raise ArgumentError.new('Missing AWS credentials') end if @bucket.nil? raise ArgumentError.new('Missing AWS bucket') end if @sincedb_path.nil? if ENV['HOME'].nil? raise ArgumentError.new('No HOME or sincedb_path set') end @sincedb_path = File.join(ENV["HOME"], ".sincedb_" + Digest::MD5.hexdigest("#{@bucket}+#{@prefix}")) end s3 = AWS::S3.new( :access_key_id => @access_key_id, :secret_access_key => @secret_access_key, :region => @region_endpoint ) @s3bucket = s3.buckets[@bucket] unless @backup_to_bucket.nil? @backup_bucket = s3.buckets[@backup_to_bucket] unless @backup_bucket.exists? s3.buckets.create(@backup_to_bucket) end end unless @backup_to_dir.nil? Dir.mkdir(@backup_to_dir, 0700) unless File.exists?(@backup_to_dir) end end |
#run(queue) ⇒ Object
135 136 137 138 139 140 141 |
# File 'lib/logstash/inputs/s3.rb', line 135 def run(queue) loop do process_new(queue) sleep(@interval) end finished end |