Class: LogStash::Inputs::S3SNSSQS
- Inherits:
-
Threadable
- Object
- Threadable
- LogStash::Inputs::S3SNSSQS
- Includes:
- LogProcessor, PluginMixins::AwsConfig::V2
- Defined in:
- lib/logstash/inputs/s3snssqs.rb
Overview
Get logs from AWS s3 buckets as issued by an object-created event via sqs.
This plugin is based on the logstash-input-sqs plugin but doesn’t log the sqs event itself. Instead it assumes, that the event is an s3 object-created event and will then download and process the given file.
Some issues of logstash-input-sqs, like logstash not shutting down properly, have been fixed for this plugin.
In contrast to logstash-input-sqs this plugin uses the “Receive Message Wait Time” configured for the sqs queue in question, a good value will be something like 10 seconds to ensure a reasonable shutdown time of logstash. Also use a “Default Visibility Timeout” that is high enough for log files to be downloaded and processed (I think a good value should be 5-10 minutes for most use cases), the plugin will avoid removing the event from the queue if the associated log file couldn’t be correctly passed to the processing level of logstash (e.g. downloaded content size doesn’t match sqs event).
This plugin is meant for high availability setups, in contrast to logstash-input-s3 you can safely use multiple logstash nodes, since the usage of sqs will ensure that each logfile is processed only once and no file will get lost on node failure or downscaling for auto-scaling groups. (You should use a “Message Retention Period” >= 4 days for your sqs to ensure you can survive a weekend of faulty log file processing) The plugin will not delete objects from s3 buckets, so make sure to have a reasonable “Lifecycle” configured for your buckets, which should keep the files at least “Message Retention Period” days.
A typical setup will contain some s3 buckets containing elb, cloudtrail or other log files. These will be configured to send object-created events to a sqs queue, which will be configured as the source queue for this plugin. (The plugin supports gzipped content if it is marked with “contend-encoding: gzip” as it is the case for cloudtrail logs)
The logstash node therefore must have sqs permissions + the permissions to download objects from the s3 buckets that send events to the queue. (If logstash nodes are running on EC2 you should use a ServerRole to provide permissions)
- source,json
-
{
"Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sqs:Get*", "sqs:List*", "sqs:ReceiveMessage", "sqs:ChangeMessageVisibility*", "sqs:DeleteMessage*" ], "Resource": [ "arn:aws:sqs:us-east-1:123456789012:my-elb-log-queue" ] }, { "Effect": "Allow", "Action": [ "s3:Get*", "s3:List*", "s3:DeleteObject" ], "Resource": [ "arn:aws:s3:::my-elb-logs", "arn:aws:s3:::my-elb-logs/*" ] } ]
}
Instance Method Summary collapse
-
#register ⇒ Object
initialisation.
-
#run(logstash_event_queue) ⇒ Object
startup.
-
#stop ⇒ Object
shutdown.
- #stop? ⇒ Boolean
Methods included from LogProcessor
Instance Method Details
#register ⇒ Object
initialisation
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/logstash/inputs/s3snssqs.rb', line 192 def register # prepare system FileUtils.mkdir_p(@temporary_directory) unless Dir.exist?(@temporary_directory) @id ||= "Unknown" #Use INPUT{ id => name} for thread identifier @credentials_by_bucket = hash_key_is_regex({}) @region_by_bucket = hash_key_is_regex({}) # create the bucket=>folder=>codec lookup from config options @codec_by_folder = hash_key_is_regex({}) @type_by_folder = hash_key_is_regex({}) # use deprecated settings only if new config is missing: if @s3_options_by_bucket.nil? # We don't know any bucket name, so we must rely on a "catch-all" regex = { 'bucket_name' => '.*', 'folders' => @set_codec_by_folder.map { |key, codec| { 'key' => key, 'codec' => codec } } } if @s3_role_arn.nil? # access key/secret key pair needed unless @s3_access_key_id.nil? or @s3_secret_access_key.nil? ['credentials'] = { 'access_key_id' => @s3_access_key_id, 'secret_access_key' => @s3_secret_access_key } end else ['credentials'] = { 'role' => @s3_role_arn } end @s3_options_by_bucket = [] end @s3_options_by_bucket.each do || bucket = ['bucket_name'] if .key?('credentials') @credentials_by_bucket[bucket] = ['credentials'] end if .key?('region') @region_by_bucket[bucket] = ['region'] end if .key?('folders') # make these hashes do key lookups using regex matching folders = hash_key_is_regex({}) types = hash_key_is_regex({}) ['folders'].each do |entry| @logger.debug("options for folder ", :folder => entry) folders[entry['key']] = entry['codec'] if entry.key?('codec') types[entry['key']] = entry['type'] if entry.key?('type') end @codec_by_folder[bucket] = folders unless folders.empty? @type_by_folder[bucket] = types unless types.empty? end end @received_stop = Concurrent::AtomicBoolean.new(false) # instantiate helpers @sqs_poller = SqsPoller.new(@logger, @received_stop, { visibility_timeout: @visibility_timeout, skip_delete: @sqs_skip_delete, wait_time_seconds: @sqs_wait_time_seconds }, { sqs_queue: @queue, queue_owner_aws_account_id: @queue_owner_aws_account_id, from_sns: @from_sns, max_processing_time: @max_processing_time, sqs_delete_on_failure: @sqs_delete_on_failure }, ) @s3_client_factory = S3ClientFactory.new(@logger, { aws_region: @region, s3_default_options: @s3_default_options, s3_credentials_by_bucket: @credentials_by_bucket, s3_region_by_bucket: @region_by_bucket, s3_role_session_name: @s3_role_session_name }, ) @s3_downloader = S3Downloader.new(@logger, @received_stop, { s3_client_factory: @s3_client_factory, delete_on_success: @delete_on_success, move_to_bucket: @move_to_bucket, include_object_properties: @include_object_properties }) @codec_factory = CodecFactory.new(@logger, { default_codec: @codec, codec_by_folder: @codec_by_folder }) #@log_processor = LogProcessor.new(self) # administrative stuff @worker_threads = [] end |
#run(logstash_event_queue) ⇒ Object
startup
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 |
# File 'lib/logstash/inputs/s3snssqs.rb', line 290 def run(logstash_event_queue) @control_threads = @consumer_threads.times.map do |thread_id| Thread.new do restart_count = 0 while not stop? #make thead start async to prevent polling the same message from sqs sleep 0.5 worker_thread = run_worker_thread(logstash_event_queue, thread_id) worker_thread.join restart_count += 1 thread_id = "#{thread_id}_#{restart_count}" @logger.info("[control_thread] restarting a thread #{thread_id}... ", :thread => worker_thread.inspect) end end end @control_threads.each { |t| t.join } end |
#stop ⇒ Object
shutdown
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 |
# File 'lib/logstash/inputs/s3snssqs.rb', line 309 def stop @received_stop.make_true unless @worker_threads.nil? @worker_threads.each do |worker| begin @logger.info("Stopping thread ... ", :thread => worker.inspect) worker.wakeup rescue @logger.error("Cannot stop thread ... try to kill him", :thread => worker.inspect) worker.kill end end end end |
#stop? ⇒ Boolean
325 326 327 |
# File 'lib/logstash/inputs/s3snssqs.rb', line 325 def stop? @received_stop.value end |