Class: Fluent::Plugin::CloudwatchIngestInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_cloudwatch_ingest.rb

Defined Under Namespace

Classes: State

Instance Method Summary collapse

Constructor Details

#initializeCloudwatchIngestInput

Returns a new instance of CloudwatchIngestInput.



66
67
68
69
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 66

def initialize
  super
  log.info('Starting fluentd-plugin-cloudwatch-ingest')
end

Instance Method Details

#configure(conf) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 80

def configure(conf)
  super
  compat_parameters_convert(conf, :parser)
  parser_config = conf.elements('parse').first
  unless parser_config
    raise Fluent::ConfigError, '<parse> section is required.'
  end
  unless parser_config['expression']
    raise Fluent::ConfigError, 'parse/expression is required.'
  end
  unless parser_config['event_time']
    raise Fluent::ConfigError, 'parse/event_time is required.'
  end

  # Configure telemetry, if enabled
  @statsd = Statsd.new @statsd_endpoint, 8125 if @telemetry

  # Fixup deprecated options
  if @api_interval
    @error_interval = @api_interval
    log.warn('api_interval is deprecated for error_interval')
  end

  @parser = parser_create(conf: parser_config)
  log.info('Configured fluentd-plugin-cloudwatch-ingest')
end

#metric(method, name, value = 0) ⇒ Object



71
72
73
74
75
76
77
78
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 71

def metric(method, name, value = 0)
  case method
  when :increment
    @statsd.send(method, name) if @telemetry
  else
    @statsd.send(method, name, value) if @telemetry
  end
end

#shutdownObject



132
133
134
135
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 132

def shutdown
  @finished = true
  @thread.join
end

#startObject



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 107

def start
  super
  log.info('Started fluentd-plugin-cloudwatch-ingest')

  # Get a handle to Cloudwatch
  aws_options = {}
  Aws.config[:region] = @region
  Aws.config[:logger] = log if @aws_logging
  log.info("Working in region #{@region}")

  if @sts_enabled
    aws_options[:credentials] = Aws::AssumeRoleCredentials.new(
      role_arn: @sts_arn,
      role_session_name: @sts_session_name
    )

    log.info("Using STS for authentication with source account ARN: #{@sts_arn}, session name: #{@sts_session_name}") # rubocop:disable LineLength
  else
    log.info('Using local instance IAM role for authentication')
  end
  @aws = Aws::CloudWatchLogs::Client.new(aws_options)
  @finished = false
  @thread = Thread.new(&method(:run))
end