Class: LogStash::Outputs::SQS
- Includes:
- PluginMixins::AwsConfig, Stud::Buffer
- Defined in:
- lib/logstash/outputs/sqs.rb
Overview
Push events to an Amazon Web Services Simple Queue Service (SQS) queue.
SQS is a simple, scalable queue system that is part of the Amazon Web Services suite of tools.
Although SQS is similar to other queuing systems like AMQP, it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS works, what the pricing schedule looks like and how to setup a queue.
To use this plugin, you must:
* Have an AWS account
* Setup an SQS queue
* Create an identify that has access to publish messages to the queue.
The “consumer” identity must have the following permissions on the queue:
* sqs:ChangeMessageVisibility
* sqs:ChangeMessageVisibilityBatch
* sqs:GetQueueAttributes
* sqs:GetQueueUrl
* sqs:ListQueues
* sqs:SendMessage
* sqs:SendMessageBatch
Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. A sample policy is as follows:
{
"Statement": [
{
"Sid": "Stmt1347986764948",
"Action": [
"sqs:ChangeMessageVisibility",
"sqs:ChangeMessageVisibilityBatch",
"sqs:DeleteMessage",
"sqs:DeleteMessageBatch",
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
"sqs:ListQueues",
"sqs:ReceiveMessage"
],
"Effect": "Allow",
"Resource": [
"arn:aws:sqs:us-east-1:200850199751:Logstash"
]
}
]
}
See aws.amazon.com/iam/ for more details on setting up AWS identities.
Constant Summary
Constants included from PluginMixins::AwsConfig
PluginMixins::AwsConfig::US_EAST_1
Constants included from Config::Mixin
Instance Attribute Summary
Attributes included from Config::Mixin
Attributes inherited from Plugin
Instance Method Summary collapse
- #aws_service_endpoint(region) ⇒ Object
-
#flush(events, teardown = false) ⇒ Object
called from Stud::Buffer#buffer_flush when there are events to flush.
- #receive(event) ⇒ Object
- #register ⇒ Object
- #teardown ⇒ Object
Methods included from PluginMixins::AwsConfig
#aws_options_hash, included, #setup_aws_config
Methods inherited from Base
#handle, #handle_worker, #initialize, #worker_setup, #workers_not_supported
Methods included from Config::Mixin
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s
Constructor Details
This class inherits a constructor from LogStash::Outputs::Base
Instance Method Details
#aws_service_endpoint(region) ⇒ Object
82 83 84 85 86 |
# File 'lib/logstash/outputs/sqs.rb', line 82 def aws_service_endpoint(region) return { :sqs_endpoint => "sqs.#{region}.amazonaws.com" } end |
#flush(events, teardown = false) ⇒ Object
called from Stud::Buffer#buffer_flush when there are events to flush
131 132 133 |
# File 'lib/logstash/outputs/sqs.rb', line 131 def flush(events, teardown=false) @sqs_queue.batch_send(events) end |
#receive(event) ⇒ Object
122 123 124 125 126 127 128 |
# File 'lib/logstash/outputs/sqs.rb', line 122 def receive(event) if @batch buffer_receive(event.to_json) return end @sqs_queue.(event.to_json) end |
#register ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/logstash/outputs/sqs.rb', line 89 def register require "aws-sdk" @sqs = AWS::SQS.new() if @batch if @batch_events > 10 raise RuntimeError.new( "AWS only allows a batch_events parameter of 10 or less" ) elsif @batch_events <= 1 raise RuntimeError.new( "batch_events parameter must be greater than 1 (or its not a batch)" ) end buffer_initialize( :max_items => @batch_events, :max_interval => @batch_timeout, :logger => @logger ) end begin @logger.debug("Connecting to AWS SQS queue '#{@queue}'...") @sqs_queue = @sqs.queues.named(@queue) rescue Exception => e @logger.error("Unable to access SQS queue '#{@queue}': #{e.to_s}") end # begin/rescue @logger.info("Connected to AWS SQS queue '#{@queue}' successfully.") end |
#teardown ⇒ Object
136 137 138 139 140 |
# File 'lib/logstash/outputs/sqs.rb', line 136 def teardown buffer_flush(:final => true) @sqs_queue = nil finished end |