Class: Fluent::Plugin::SQSInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::SQSInput
- Defined in:
- lib/fluent/plugin/in_sqs.rb
Instance Method Summary collapse
- #client ⇒ Object
- #configure(conf) ⇒ Object
- #queue ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Instance Method Details
#client ⇒ Object
33 34 35 36 37 38 39 40 |
# File 'lib/fluent/plugin/in_sqs.rb', line 33 def client @client ||= Aws::SQS::Client.new( access_key_id: @aws_key_id, secret_access_key: @aws_sec_key, region: @region, stub_responses: @stub_responses ) end |
#configure(conf) ⇒ Object
23 24 25 |
# File 'lib/fluent/plugin/in_sqs.rb', line 23 def configure(conf) super end |
#queue ⇒ Object
42 43 44 45 46 47 |
# File 'lib/fluent/plugin/in_sqs.rb', line 42 def queue @queue ||= Aws::SQS::Queue.new( url: client.get_queue_url(queue_name: @queue_name, queue_owner_aws_account_id: @queue_owner_aws_account_id).queue_url, client: client ) end |
#run ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/fluent/plugin/in_sqs.rb', line 53 def run queue.( max_number_of_messages: @max_number_of_messages, wait_time_seconds: @wait_time_seconds, visibility_timeout: @visibility_timeout ).each do || record = () .delete if @delete_message router.emit(@tag, Fluent::Engine.now, record) end rescue log.error 'failed to emit or receive', error: $ERROR_INFO.to_s, error_class: $ERROR_INFO.class.to_s log.warn_backtrace $ERROR_INFO.backtrace end |
#shutdown ⇒ Object
49 50 51 |
# File 'lib/fluent/plugin/in_sqs.rb', line 49 def shutdown super end |
#start ⇒ Object
27 28 29 30 31 |
# File 'lib/fluent/plugin/in_sqs.rb', line 27 def start super timer_execute(:in_sqs_run_periodic_timer, @receive_interval, &method(:run)) end |