Class: LogStash::Inputs::S3SNSSQS

Inherits:
Threadable
  • Object
show all
Includes:
LogProcessor, PluginMixins::AwsConfig::V2
Defined in:
lib/logstash/inputs/s3snssqs.rb

Overview

Get logs from AWS s3 buckets as issued by an object-created event via sqs.

This plugin is based on the logstash-input-sqs plugin but doesn’t log the sqs event itself. Instead it assumes, that the event is an s3 object-created event and will then download and process the given file.

Some issues of logstash-input-sqs, like logstash not shutting down properly, have been fixed for this plugin.

In contrast to logstash-input-sqs this plugin uses the “Receive Message Wait Time” configured for the sqs queue in question, a good value will be something like 10 seconds to ensure a reasonable shutdown time of logstash. Also use a “Default Visibility Timeout” that is high enough for log files to be downloaded and processed (I think a good value should be 5-10 minutes for most use cases), the plugin will avoid removing the event from the queue if the associated log file couldn’t be correctly passed to the processing level of logstash (e.g. downloaded content size doesn’t match sqs event).

This plugin is meant for high availability setups, in contrast to logstash-input-s3 you can safely use multiple logstash nodes, since the usage of sqs will ensure that each logfile is processed only once and no file will get lost on node failure or downscaling for auto-scaling groups. (You should use a “Message Retention Period” >= 4 days for your sqs to ensure you can survive a weekend of faulty log file processing) The plugin will not delete objects from s3 buckets, so make sure to have a reasonable “Lifecycle” configured for your buckets, which should keep the files at least “Message Retention Period” days.

A typical setup will contain some s3 buckets containing elb, cloudtrail or other log files. These will be configured to send object-created events to a sqs queue, which will be configured as the source queue for this plugin. (The plugin supports gzipped content if it is marked with “contend-encoding: gzip” as it is the case for cloudtrail logs)

The logstash node therefore must have sqs permissions + the permissions to download objects from the s3 buckets that send events to the queue. (If logstash nodes are running on EC2 you should use a ServerRole to provide permissions)

source,json

{

"Version": "2012-10-17",
"Statement": [
    {
        "Effect": "Allow",
        "Action": [
            "sqs:Get*",
            "sqs:List*",
            "sqs:ReceiveMessage",
            "sqs:ChangeMessageVisibility*",
            "sqs:DeleteMessage*"
        ],
        "Resource": [
            "arn:aws:sqs:us-east-1:123456789012:my-elb-log-queue"
        ]
    },
    {
        "Effect": "Allow",
        "Action": [
            "s3:Get*",
            "s3:List*",
            "s3:DeleteObject"
        ],
        "Resource": [
            "arn:aws:s3:::my-elb-logs",
            "arn:aws:s3:::my-elb-logs/*"
        ]
    }
]

}

Instance Method Summary collapse

Methods included from LogProcessor

included, #process

Instance Method Details

#registerObject

initialisation


192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/logstash/inputs/s3snssqs.rb', line 192

def register
  # prepare system
  FileUtils.mkdir_p(@temporary_directory) unless Dir.exist?(@temporary_directory)
  @id ||= "Unknown" #Use INPUT{ id => name} for thread identifier
  @credentials_by_bucket = hash_key_is_regex({})
  @region_by_bucket = hash_key_is_regex({})
  # create the bucket=>folder=>codec lookup from config options
  @codec_by_folder = hash_key_is_regex({})
  @type_by_folder = hash_key_is_regex({})

  # use deprecated settings only if new config is missing:
  if @s3_options_by_bucket.nil?
    # We don't know any bucket name, so we must rely on a "catch-all" regex
    s3_options = {
      'bucket_name' => '.*',
      'folders' => @set_codec_by_folder.map { |key, codec|
        { 'key' => key, 'codec' => codec }
      }
    }
    if @s3_role_arn.nil?
      # access key/secret key pair needed
      unless @s3_access_key_id.nil? or @s3_secret_access_key.nil?
        s3_options['credentials'] = {
          'access_key_id' => @s3_access_key_id,
          'secret_access_key' => @s3_secret_access_key
        }
      end
    else
      s3_options['credentials'] = {
        'role' => @s3_role_arn
      }
    end
    @s3_options_by_bucket = [s3_options]
  end

  @s3_options_by_bucket.each do |options|
    bucket = options['bucket_name']
    if options.key?('credentials')
      @credentials_by_bucket[bucket] = options['credentials']
    end
    if options.key?('region')
      @region_by_bucket[bucket] = options['region']
    end
    if options.key?('folders')
      # make these hashes do key lookups using regex matching
      folders = hash_key_is_regex({})
      types = hash_key_is_regex({})
      options['folders'].each do |entry|
        @logger.debug("options for folder ", :folder => entry)
        folders[entry['key']] = entry['codec'] if entry.key?('codec')
        types[entry['key']] = entry['type'] if entry.key?('type')
      end
      @codec_by_folder[bucket] = folders unless folders.empty?
      @type_by_folder[bucket] = types unless types.empty?
    end
  end

  @received_stop = Concurrent::AtomicBoolean.new(false)

  # instantiate helpers
  @sqs_poller = SqsPoller.new(@logger, @received_stop,
    {
      visibility_timeout: @visibility_timeout,
      skip_delete: @sqs_skip_delete,
      wait_time_seconds: @sqs_wait_time_seconds
    },
    {
      sqs_queue: @queue,
      queue_owner_aws_account_id: @queue_owner_aws_account_id,
      from_sns: @from_sns,
      max_processing_time: @max_processing_time,
      sqs_delete_on_failure: @sqs_delete_on_failure
    },
    aws_options_hash)
  @s3_client_factory = S3ClientFactory.new(@logger, {
    aws_region: @region,
    s3_default_options: @s3_default_options,
    s3_credentials_by_bucket: @credentials_by_bucket,
    s3_region_by_bucket: @region_by_bucket,
    s3_role_session_name: @s3_role_session_name
  }, aws_options_hash)
  @s3_downloader = S3Downloader.new(@logger, @received_stop, {
    s3_client_factory: @s3_client_factory,
    delete_on_success: @delete_on_success,
    move_to_bucket: @move_to_bucket,
    include_object_properties: @include_object_properties
  })
  @codec_factory = CodecFactory.new(@logger, {
    default_codec: @codec,
    codec_by_folder: @codec_by_folder
  })
  #@log_processor = LogProcessor.new(self)

  # administrative stuff
  @worker_threads = []
end

#run(logstash_event_queue) ⇒ Object

startup


290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'lib/logstash/inputs/s3snssqs.rb', line 290

def run(logstash_event_queue)
  @control_threads = @consumer_threads.times.map do |thread_id|
    Thread.new do
      restart_count = 0
      while not stop?
        #make thead start async to prevent polling the same message from sqs
        sleep 0.5
        worker_thread = run_worker_thread(logstash_event_queue, thread_id)
        worker_thread.join
        restart_count += 1
        thread_id = "#{thread_id}_#{restart_count}"
        @logger.info("[control_thread] restarting a thread #{thread_id}... ", :thread => worker_thread.inspect)
      end
    end
  end
  @control_threads.each { |t| t.join }
end

#stopObject

shutdown


309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/logstash/inputs/s3snssqs.rb', line 309

def stop
  @received_stop.make_true

  unless @worker_threads.nil?
    @worker_threads.each do |worker|
      begin
        @logger.info("Stopping thread ... ", :thread => worker.inspect)
        worker.wakeup
      rescue
        @logger.error("Cannot stop thread ... try to kill him", :thread => worker.inspect)
        worker.kill
      end
    end
  end
end

#stop?Boolean

Returns:

  • (Boolean)

325
326
327
# File 'lib/logstash/inputs/s3snssqs.rb', line 325

def stop?
  @received_stop.value
end