Class: Fluent::Plugin::S3Input

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_s3.rb,
lib/fluent/plugin/s3_extractor_lzo.rb,
lib/fluent/plugin/s3_extractor_lzma2.rb,
lib/fluent/plugin/s3_extractor_gzip_command.rb

Defined Under Namespace

Classes: Extractor, GzipCommandExtractor, GzipExtractor, JsonExtractor, LZMA2Extractor, LZOExtractor, TextExtractor

Constant Summary collapse

DEFAULT_PARSE_TYPE =
"none"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeS3Input

Returns a new instance of S3Input.



18
19
20
21
# File 'lib/fluent/plugin/in_s3.rb', line 18

def initialize
  super
  @extractor = nil
end

Instance Attribute Details

#bucketObject (readonly)

Returns the value of attribute bucket.



118
119
120
# File 'lib/fluent/plugin/in_s3.rb', line 118

def bucket
  @bucket
end

Instance Method Details

#configure(conf) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/fluent/plugin/in_s3.rb', line 120

def configure(conf)
  super

  if @s3_endpoint && (@s3_endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @s3_endpoint.include?(e) })
    raise Fluent::ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
  end

  if @sqs.endpoint && (@sqs.endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @sqs.endpoint.include?(e) })
    raise Fluent::ConfigError, "sqs/endpoint parameter is not supported for SQS, use s3_region instead. This parameter is for SQS compatible services"
  end

  parser_config = conf.elements("parse").first
  unless @sqs.queue_name
    raise Fluent::ConfigError, "sqs/queue_name is required"
  end

  Aws.use_bundled_cert! if @use_bundled_cert

  @extractor = EXTRACTOR_REGISTRY.lookup(@store_as).new(log: log)
  @extractor.configure(conf)

  @parser = parser_create(conf: parser_config, default_type: DEFAULT_PARSE_TYPE)
end

#shutdownObject



169
170
171
172
# File 'lib/fluent/plugin/in_s3.rb', line 169

def shutdown
  @running = false
  super
end

#startObject



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/fluent/plugin/in_s3.rb', line 144

def start
  super

  s3_client = create_s3_client
  log.debug("Succeeded to create S3 client")
  @s3 = Aws::S3::Resource.new(client: s3_client)
  @bucket = @s3.bucket(@s3_bucket)

  raise "#{@bucket.name} is not found." unless @bucket.exists?

  check_apikeys if @check_apikey_on_start

  sqs_client = create_sqs_client
  log.debug("Succeeded to create SQS client")
  response = sqs_client.get_queue_url(queue_name: @sqs.queue_name, queue_owner_aws_account_id: @sqs.)
  sqs_queue_url = response.queue_url
  log.debug("Succeeded to get SQS queue URL")
  @include_file_regex = Regexp.new(@sqs.include_file_regex)

  @poller = Aws::SQS::QueuePoller.new(sqs_queue_url, client: sqs_client)

  @running = true
  thread_create(:in_s3, &method(:run))
end