Class: LogStash::Inputs::SQS
- Inherits:
-
Threadable
- Object
- Plugin
- Base
- Threadable
- LogStash::Inputs::SQS
- Includes:
- PluginMixins::AwsConfig
- Defined in:
- lib/logstash/inputs/sqs.rb
Overview
Pull events from an Amazon Web Services Simple Queue Service (SQS) queue.
SQS is a simple, scalable queue system that is part of the Amazon Web Services suite of tools.
Although SQS is similar to other queuing systems like AMQP, it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS works, what the pricing schedule looks like and how to setup a queue.
To use this plugin, you must:
* Have an AWS account
* Setup an SQS queue
* Create an identify that has access to consume messages from the queue.
The “consumer” identity must have the following permissions on the queue:
* sqs:ChangeMessageVisibility
* sqs:ChangeMessageVisibilityBatch
* sqs:DeleteMessage
* sqs:DeleteMessageBatch
* sqs:GetQueueAttributes
* sqs:GetQueueUrl
* sqs:ListQueues
* sqs:ReceiveMessage
Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. A sample policy is as follows:
{
"Statement": [
{
"Action": [
"sqs:ChangeMessageVisibility",
"sqs:ChangeMessageVisibilityBatch",
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
"sqs:ListQueues",
"sqs:SendMessage",
"sqs:SendMessageBatch"
],
"Effect": "Allow",
"Resource": [
"arn:aws:sqs:us-east-1:123456789012:Logstash"
]
}
]
}
See aws.amazon.com/iam/ for more details on setting up AWS identities.
Constant Summary
Constants included from PluginMixins::AwsConfig
PluginMixins::AwsConfig::US_EAST_1
Constants included from Config::Mixin
Instance Attribute Summary
Attributes inherited from Base
Attributes included from Config::Mixin
Attributes inherited from Plugin
Instance Method Summary collapse
- #aws_service_endpoint(region) ⇒ Object
- #register ⇒ Object
- #run(output_queue) ⇒ Object
-
#teardown ⇒ Object
def run.
Methods included from PluginMixins::AwsConfig
#aws_options_hash, included, #setup_aws_config
Methods inherited from Threadable
Methods inherited from Base
Methods included from Config::Mixin
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s
Constructor Details
This class inherits a constructor from LogStash::Inputs::Threadable
Instance Method Details
#aws_service_endpoint(region) ⇒ Object
80 81 82 83 84 |
# File 'lib/logstash/inputs/sqs.rb', line 80 def aws_service_endpoint(region) return { :sqs_endpoint => "sqs.#{region}.amazonaws.com" } end |
#register ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/logstash/inputs/sqs.rb', line 87 def register @logger.info("Registering SQS input", :queue => @queue) require "aws-sdk" @sqs = AWS::SQS.new() begin @logger.debug("Connecting to AWS SQS queue", :queue => @queue) @sqs_queue = @sqs.queues.named(@queue) @logger.info("Connected to AWS SQS queue successfully.", :queue => @queue) rescue Exception => e @logger.error("Unable to access SQS queue.", :error => e.to_s, :queue => @queue) throw e end # begin/rescue end |
#run(output_queue) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/logstash/inputs/sqs.rb', line 104 def run(output_queue) @logger.debug("Polling SQS queue", :queue => @queue) receive_opts = { :limit => 10, :visibility_timeout => 30, :attributes => [:sent_at] } continue_polling = true while running? && continue_polling continue_polling = run_with_backoff(60, 1) do @sqs_queue.(receive_opts) do || if @codec.decode(.body) do |event| decorate(event) if @id_field event[@id_field] = .id end if @md5_field event[@md5_field] = .md5 end if @sent_timestamp_field event[@sent_timestamp_field] = ..utc end @logger.debug? && @logger.debug("Processed SQS message", :message_id => .id, :message_md5 => .md5, :sent_timestamp => ., :queue => @queue) output_queue << event .delete end # codec.decode end # valid SQS message end # receive_message end # run_with_backoff end # polling loop end |
#teardown ⇒ Object
def run
139 140 141 142 |
# File 'lib/logstash/inputs/sqs.rb', line 139 def teardown @sqs_queue = nil finished end |