Class: LogStash::Inputs::S3L

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::AwsConfig::V2
Defined in:
lib/logstash/inputs/s3l.rb

Overview

Files ending in ‘.gz` are handled as gzip’ed files.

Defined Under Namespace

Modules: SinceDB

Instance Method Summary collapse

Instance Method Details

#backup_to_bucket(object) ⇒ Object



135
136
137
138
139
140
141
142
143
# File 'lib/logstash/inputs/s3l.rb', line 135

def backup_to_bucket(object)
  unless @backup_to_bucket.nil?
    backup_key = "#{@backup_add_prefix}#{object.key}"
    @backup_bucket.object(backup_key).copy_from(:copy_source => "#{object.bucket_name}/#{object.key}")
    if @delete
      object.delete()
    end
  end
end

#backup_to_dir(filename) ⇒ Object



146
147
148
149
150
# File 'lib/logstash/inputs/s3l.rb', line 146

def backup_to_dir(filename)
  unless @backup_to_dir.nil?
    FileUtils.cp(filename, @backup_to_dir)
  end
end

#list_new_filesObject



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/logstash/inputs/s3l.rb', line 110

def list_new_files
  objects = {}
  found = false
  begin
    @s3bucket.objects(:prefix => @prefix).each do |log|
      found = true
      @logger.debug("S3 input: Found key", :key => log.key)
      if !ignore_filename?(log.key)
        if sincedb.newer?(log.last_modified) && log.content_length > 0
          objects[log.key] = log.last_modified
          @logger.debug("S3 input: Adding to objects[]", :key => log.key)
          @logger.debug("objects[] length is: ", :length => objects.length)
        end
      else
        @logger.debug('S3 input: Ignoring', :key => log.key)
      end
    end
    @logger.info('S3 input: No files found in bucket', :prefix => prefix) unless found
  rescue Aws::Errors::ServiceError => e
    @logger.error("S3 input: Unable to list objects in bucket", :prefix => prefix, :message => e.message)
  end
  objects.keys.sort {|a,b| objects[a] <=> objects[b]}
end

#process_files(queue) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/logstash/inputs/s3l.rb', line 153

def process_files(queue)
  objects = list_new_files

  objects.each do |key|
    if stop?
      break
    else
      @logger.debug("S3 input processing", :bucket => @bucket, :key => key)
      process_log(queue, key)
    end
  end
end

#registerObject



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/logstash/inputs/s3l.rb', line 74

def register
  require "fileutils"
  require "digest/md5"
  require "aws-sdk-resources"

  @logger.info("Registering s3 input", :bucket => @bucket, :region => @region)

  s3 = get_s3object

  @s3bucket = s3.bucket(@bucket)

  unless @backup_to_bucket.nil?
    @backup_bucket = s3.bucket(@backup_to_bucket)
    begin
      s3.client.head_bucket({ :bucket => @backup_to_bucket})
    rescue Aws::S3::Errors::NoSuchBucket
      s3.create_bucket({ :bucket => @backup_to_bucket})
    end
  end

  unless @backup_to_dir.nil?
    Dir.mkdir(@backup_to_dir, 0700) unless File.exists?(@backup_to_dir)
  end

  FileUtils.mkdir_p(@temporary_directory) unless Dir.exist?(@temporary_directory)
end

#run(queue) ⇒ Object



102
103
104
105
106
107
# File 'lib/logstash/inputs/s3l.rb', line 102

def run(queue)
  @current_thread = Thread.current
  Stud.interval(@interval) do
    process_files(queue)
  end
end

#stopObject



167
168
169
170
171
172
# File 'lib/logstash/inputs/s3l.rb', line 167

def stop
  # @current_thread is initialized in the `#run` method,
  # this variable is needed because the `#stop` is a called in another thread
  # than the `#run` method and requiring us to call stop! with a explicit thread.
  Stud.stop!(@current_thread)
end