Class: LogStash::Outputs::SQS

Inherits:
Base show all
Includes:
PluginMixins::AwsConfig, Stud::Buffer
Defined in:
lib/logstash/outputs/sqs.rb

Overview

Push events to an Amazon Web Services Simple Queue Service (SQS) queue.

SQS is a simple, scalable queue system that is part of the Amazon Web Services suite of tools.

Although SQS is similar to other queuing systems like AMQP, it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS works, what the pricing schedule looks like and how to setup a queue.

To use this plugin, you must:

* Have an AWS account
* Setup an SQS queue
* Create an identify that has access to publish messages to the queue.

The “consumer” identity must have the following permissions on the queue:

* sqs:ChangeMessageVisibility
* sqs:ChangeMessageVisibilityBatch
* sqs:GetQueueAttributes
* sqs:GetQueueUrl
* sqs:ListQueues
* sqs:SendMessage
* sqs:SendMessageBatch

Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. A sample policy is as follows:

{
  "Statement": [
    {
      "Sid": "Stmt1347986764948",
      "Action": [
        "sqs:ChangeMessageVisibility",
        "sqs:ChangeMessageVisibilityBatch",
        "sqs:DeleteMessage",
        "sqs:DeleteMessageBatch",
        "sqs:GetQueueAttributes",
        "sqs:GetQueueUrl",
        "sqs:ListQueues",
        "sqs:ReceiveMessage"
      ],
      "Effect": "Allow",
      "Resource": [
        "arn:aws:sqs:us-east-1:200850199751:Logstash"
      ]
    }
  ]
}

See aws.amazon.com/iam/ for more details on setting up AWS identities.

Constant Summary

Constants included from PluginMixins::AwsConfig

PluginMixins::AwsConfig::US_EAST_1

Constants included from Config::Mixin

Config::Mixin::CONFIGSORT

Instance Attribute Summary

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Instance Method Summary collapse

Methods included from PluginMixins::AwsConfig

#aws_options_hash, included, #setup_aws_config

Methods inherited from Base

#handle, #handle_worker, #initialize, #worker_setup, #workers_not_supported

Methods included from Config::Mixin

#config_init, included

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s

Constructor Details

This class inherits a constructor from LogStash::Outputs::Base

Instance Method Details

#aws_service_endpoint(region) ⇒ Object



82
83
84
85
86
# File 'lib/logstash/outputs/sqs.rb', line 82

def aws_service_endpoint(region)
  return {
      :sqs_endpoint => "sqs.#{region}.amazonaws.com"
  }
end

#flush(events, teardown = false) ⇒ Object

called from Stud::Buffer#buffer_flush when there are events to flush



131
132
133
# File 'lib/logstash/outputs/sqs.rb', line 131

def flush(events, teardown=false)
  @sqs_queue.batch_send(events)
end

#receive(event) ⇒ Object



122
123
124
125
126
127
128
# File 'lib/logstash/outputs/sqs.rb', line 122

def receive(event)
  if @batch
    buffer_receive(event.to_json)
    return
  end
  @sqs_queue.send_message(event.to_json)
end

#registerObject



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/logstash/outputs/sqs.rb', line 89

def register
  require "aws-sdk"

  @sqs = AWS::SQS.new(aws_options_hash)

  if @batch
    if @batch_events > 10
      raise RuntimeError.new(
        "AWS only allows a batch_events parameter of 10 or less"
      )
    elsif @batch_events <= 1
      raise RuntimeError.new(
        "batch_events parameter must be greater than 1 (or its not a batch)"
      )
    end
    buffer_initialize(
      :max_items => @batch_events,
      :max_interval => @batch_timeout,
      :logger => @logger
    )
  end

  begin
    @logger.debug("Connecting to AWS SQS queue '#{@queue}'...")
    @sqs_queue = @sqs.queues.named(@queue)
  rescue Exception => e
    @logger.error("Unable to access SQS queue '#{@queue}': #{e.to_s}")
  end # begin/rescue

  @logger.info("Connected to AWS SQS queue '#{@queue}' successfully.")
end

#teardownObject



136
137
138
139
140
# File 'lib/logstash/outputs/sqs.rb', line 136

def teardown
  buffer_flush(:final => true)
  @sqs_queue = nil
  finished
end