Class: Fluent::Plugin::S3Input
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::S3Input
- Defined in:
- lib/fluent/plugin/in_s3.rb,
lib/fluent/plugin/s3_extractor_lzo.rb,
lib/fluent/plugin/s3_extractor_lzma2.rb,
lib/fluent/plugin/s3_extractor_gzip_command.rb
Defined Under Namespace
Classes: Extractor, GzipCommandExtractor, GzipExtractor, JsonExtractor, LZMA2Extractor, LZOExtractor, TextExtractor
Constant Summary collapse
- DEFAULT_PARSE_TYPE =
"none"
Instance Attribute Summary collapse
-
#bucket ⇒ Object
readonly
Returns the value of attribute bucket.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ S3Input
constructor
A new instance of S3Input.
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ S3Input
Returns a new instance of S3Input.
18 19 20 21 |
# File 'lib/fluent/plugin/in_s3.rb', line 18 def initialize super @extractor = nil end |
Instance Attribute Details
#bucket ⇒ Object (readonly)
Returns the value of attribute bucket.
118 119 120 |
# File 'lib/fluent/plugin/in_s3.rb', line 118 def bucket @bucket end |
Instance Method Details
#configure(conf) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/fluent/plugin/in_s3.rb', line 120 def configure(conf) super if @s3_endpoint && (@s3_endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @s3_endpoint.include?(e) }) raise Fluent::ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services" end if @sqs.endpoint && (@sqs.endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @sqs.endpoint.include?(e) }) raise Fluent::ConfigError, "sqs/endpoint parameter is not supported for SQS, use s3_region instead. This parameter is for SQS compatible services" end parser_config = conf.elements("parse").first unless @sqs.queue_name raise Fluent::ConfigError, "sqs/queue_name is required" end Aws.use_bundled_cert! if @use_bundled_cert @extractor = EXTRACTOR_REGISTRY.lookup(@store_as).new(log: log) @extractor.configure(conf) @parser = parser_create(conf: parser_config, default_type: DEFAULT_PARSE_TYPE) end |
#shutdown ⇒ Object
169 170 171 172 |
# File 'lib/fluent/plugin/in_s3.rb', line 169 def shutdown @running = false super end |
#start ⇒ Object
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/fluent/plugin/in_s3.rb', line 144 def start super s3_client = create_s3_client log.debug("Succeeded to create S3 client") @s3 = Aws::S3::Resource.new(client: s3_client) @bucket = @s3.bucket(@s3_bucket) raise "#{@bucket.name} is not found." unless @bucket.exists? check_apikeys if @check_apikey_on_start sqs_client = create_sqs_client log.debug("Succeeded to create SQS client") response = sqs_client.get_queue_url(queue_name: @sqs.queue_name, queue_owner_aws_account_id: @sqs.queue_owner_aws_account_id) sqs_queue_url = response.queue_url log.debug("Succeeded to get SQS queue URL") @include_file_regex = Regexp.new(@sqs.include_file_regex) @poller = Aws::SQS::QueuePoller.new(sqs_queue_url, client: sqs_client) @running = true thread_create(:in_s3, &method(:run)) end |