Class: Fluent::Plugin::CloudwatchIngestInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_cloudwatch_ingest.rb

Defined Under Namespace

Classes: State

Instance Method Summary collapse

Constructor Details

#initializeCloudwatchIngestInput



71
72
73
74
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 71

def initialize
  super
  log.info('Starting fluentd-plugin-cloudwatch-ingest')
end

Instance Method Details

#configure(conf) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 85

def configure(conf)
  super
  compat_parameters_convert(conf, :parser)
  parser_config = conf.elements('parse').first
  unless parser_config
    raise Fluent::ConfigError, '<parse> section is required.'
  end
  unless parser_config['expression']
    raise Fluent::ConfigError, 'parse/expression is required.'
  end
  unless parser_config['event_time']
    raise Fluent::ConfigError, 'parse/event_time is required.'
  end

  # Configure telemetry, if enabled
  @statsd = Statsd.new @statsd_endpoint, 8125 if @telemetry

  # Fixup deprecated options
  if @api_interval
    @error_interval = @api_interval
    log.warn('api_interval is deprecated for error_interval')
  end

  @parser = parser_create(conf: parser_config)
  log.info('Configured fluentd-plugin-cloudwatch-ingest')

  @log_streams_next_token = nil
end

#metric(method, name, value = 0) ⇒ Object



76
77
78
79
80
81
82
83
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 76

def metric(method, name, value = 0)
  case method
  when :increment
    @statsd.send(method, name) if @telemetry
  else
    @statsd.send(method, name, value) if @telemetry
  end
end

#shutdownObject



140
141
142
143
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 140

def shutdown
  @finished = true
  @thread.join
end

#startObject



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 114

def start
  super
  log.info('Started fluentd-plugin-cloudwatch-ingest')

  # Get a handle to Cloudwatch
  aws_options = {}
  Aws.config[:region] = @region
  Aws.config[:logger] = log if @aws_logging
  log.info("Working in region #{@region}")

  if @sts_enabled
    aws_options[:credentials] = Aws::AssumeRoleCredentials.new(
      role_arn: @sts_arn,
      role_session_name: @sts_session_name
    )

    log.info('Using STS for authentication with source account ARN: '\
             "#{@sts_arn}, session name: #{@sts_session_name}")
  else
    log.info('Using local instance IAM role for authentication')
  end
  @aws = Aws::CloudWatchLogs::Client.new(aws_options)
  @finished = false
  @thread = Thread.new(&method(:run))
end