Class: LogStash::Inputs::SQS

Inherits:
Threadable show all
Includes:
PluginMixins::AwsConfig
Defined in:
lib/logstash/inputs/sqs.rb

Overview

Pull events from an Amazon Web Services Simple Queue Service (SQS) queue.

SQS is a simple, scalable queue system that is part of the Amazon Web Services suite of tools.

Although SQS is similar to other queuing systems like AMQP, it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS works, what the pricing schedule looks like and how to setup a queue.

To use this plugin, you must:

* Have an AWS account
* Setup an SQS queue
* Create an identify that has access to consume messages from the queue.

The “consumer” identity must have the following permissions on the queue:

* sqs:ChangeMessageVisibility
* sqs:ChangeMessageVisibilityBatch
* sqs:DeleteMessage
* sqs:DeleteMessageBatch
* sqs:GetQueueAttributes
* sqs:GetQueueUrl
* sqs:ListQueues
* sqs:ReceiveMessage

Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. A sample policy is as follows:

{
  "Statement": [
    {
      "Action": [
        "sqs:ChangeMessageVisibility",
        "sqs:ChangeMessageVisibilityBatch",
        "sqs:GetQueueAttributes",
        "sqs:GetQueueUrl",
        "sqs:ListQueues",
        "sqs:SendMessage",
        "sqs:SendMessageBatch"
      ],
      "Effect": "Allow",
      "Resource": [
        "arn:aws:sqs:us-east-1:123456789012:Logstash"
      ]
    }
  ]
}

See aws.amazon.com/iam/ for more details on setting up AWS identities.

Constant Summary

Constants included from PluginMixins::AwsConfig

PluginMixins::AwsConfig::US_EAST_1

Constants included from Config::Mixin

Config::Mixin::CONFIGSORT

Instance Attribute Summary

Attributes inherited from Base

#params, #threadable

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Instance Method Summary collapse

Methods included from PluginMixins::AwsConfig

#aws_options_hash, included, #setup_aws_config

Methods inherited from Threadable

#initialize

Methods inherited from Base

#initialize, #tag

Methods included from Config::Mixin

#config_init, included

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s

Constructor Details

This class inherits a constructor from LogStash::Inputs::Threadable

Instance Method Details

#aws_service_endpoint(region) ⇒ Object



80
81
82
83
84
# File 'lib/logstash/inputs/sqs.rb', line 80

def aws_service_endpoint(region)
  return {
      :sqs_endpoint => "sqs.#{region}.amazonaws.com"
  }
end

#registerObject



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/logstash/inputs/sqs.rb', line 87

def register
  @logger.info("Registering SQS input", :queue => @queue)
  require "aws-sdk"

  @sqs = AWS::SQS.new(aws_options_hash)

  begin
    @logger.debug("Connecting to AWS SQS queue", :queue => @queue)
    @sqs_queue = @sqs.queues.named(@queue)
    @logger.info("Connected to AWS SQS queue successfully.", :queue => @queue)
  rescue Exception => e
    @logger.error("Unable to access SQS queue.", :error => e.to_s, :queue => @queue)
    throw e
  end # begin/rescue
end

#run(output_queue) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/logstash/inputs/sqs.rb', line 104

def run(output_queue)
  @logger.debug("Polling SQS queue", :queue => @queue)

  receive_opts = {
      :limit => 10,
      :visibility_timeout => 30,
      :attributes => [:sent_at]
  }

  continue_polling = true
  while running? && continue_polling
    continue_polling = run_with_backoff(60, 1) do
      @sqs_queue.receive_message(receive_opts) do |message|
        if message
          @codec.decode(message.body) do |event|
            decorate(event)
            if @id_field
              event[@id_field] = message.id
            end
            if @md5_field
              event[@md5_field] = message.md5
            end
            if @sent_timestamp_field
              event[@sent_timestamp_field] = message.sent_timestamp.utc
            end
            @logger.debug? && @logger.debug("Processed SQS message", :message_id => message.id, :message_md5 => message.md5, :sent_timestamp => message.sent_timestamp, :queue => @queue)
            output_queue << event
            message.delete
          end # codec.decode
        end # valid SQS message
      end # receive_message
    end # run_with_backoff
  end # polling loop
end

#teardownObject

def run



139
140
141
142
# File 'lib/logstash/inputs/sqs.rb', line 139

def teardown
  @sqs_queue = nil
  finished
end