Class: LogStash::Inputs::KinesisCloudWatchLogSubscription

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/kinesis_cloudwatch_log_subscription.rb

Overview

Receive events through an AWS Kinesis stream.

This input plugin uses the Java Kinesis Client Library underneath, so the documentation at github.com/awslabs/amazon-kinesis-client will be useful.

AWS credentials can be specified either through environment variables, or an IAM instance role. The library uses a DynamoDB table for worker coordination, so you’ll need to grant access to that as well as to the Kinesis stream. The DynamoDB table has the same name as the ‘application_name` configuration option, which defaults to “logstash”.

The library can optionally also send worker statistics to CloudWatch.

Constant Summary collapse

KCL =
com.amazonaws.services.kinesis.clientlibrary.lib.worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params = {}, kcl_class = KCL::Worker) ⇒ KinesisCloudWatchLogSubscription

Returns a new instance of KinesisCloudWatchLogSubscription.


52
53
54
55
# File 'lib/logstash/inputs/kinesis_cloudwatch_log_subscription.rb', line 52

def initialize(params = {}, kcl_class = KCL::Worker)
  @kcl_class = kcl_class
  super(params)
end

Instance Attribute Details

#kcl_configObject (readonly)

Returns the value of attribute kcl_config.


30
31
32
# File 'lib/logstash/inputs/kinesis_cloudwatch_log_subscription.rb', line 30

def kcl_config
  @kcl_config
end

Instance Method Details

#registerObject


57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/logstash/inputs/kinesis_cloudwatch_log_subscription.rb', line 57

def register
  # the INFO log level is extremely noisy in KCL
  org.apache.commons.logging::LogFactory.getLog("com.amazonaws.services.kinesis").
    logger.setLevel(java.util.logging::Level::WARNING)

  worker_id = java.util::UUID.randomUUID.to_s
  creds = com.amazonaws.auth::DefaultAWSCredentialsProviderChain.new()
  @kcl_config = KCL::KinesisClientLibConfiguration.new(
    @application_name,
    @kinesis_stream_name,
    creds,
    worker_id).
      withInitialPositionInStream(KCL::InitialPositionInStream::TRIM_HORIZON).
      withRegionName(@region)
end

#run(output_queue) ⇒ Object


73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/logstash/inputs/kinesis_cloudwatch_log_subscription.rb', line 73

def run(output_queue)
  worker_factory = proc { Worker.new(@codec.clone, output_queue, method(:decorate), @checkpoint_interval_seconds, @logger) }
  if metrics_factory
    @kcl_worker = @kcl_class.new(
      worker_factory,
      @kcl_config,
      metrics_factory)
  else
    @kcl_worker = @kcl_class.new(
      worker_factory,
      @kcl_config)
  end

  @kcl_worker.run()
end

#teardownObject


89
90
91
# File 'lib/logstash/inputs/kinesis_cloudwatch_log_subscription.rb', line 89

def teardown
  @kcl_worker.shutdown if @kcl_worker
end