Class: Fluent::SQSInput
- Inherits:
-
Input
- Object
- Input
- Fluent::SQSInput
- Defined in:
- lib/fluent/plugin/in_sqs.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ SQSInput
constructor
A new instance of SQSInput.
- #run_periodic ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ SQSInput
Returns a new instance of SQSInput.
8 9 10 |
# File 'lib/fluent/plugin/in_sqs.rb', line 8 def initialize super end |
Instance Method Details
#configure(conf) ⇒ Object
20 21 22 23 |
# File 'lib/fluent/plugin/in_sqs.rb', line 20 def configure(conf) super end |
#run_periodic ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/fluent/plugin/in_sqs.rb', line 46 def run_periodic until @finished begin sleep @receive_interval @queue.(:limit => @max_number_of_messages) do || record = {} record['body'] = .body.to_s record['handle'] = .handle.to_s record['id'] = .id.to_s record['md5'] = .md5.to_s record['url'] = .queue.url.to_s record['sender_id'] = .sender_id.to_s Engine.emit(@tag, Time.now.to_i, record) end rescue $log.error "failed to emit or receive", :error => $!.to_s, :error_class => $!.class.to_s $log.warn_backtrace $!.backtrace end end end |
#shutdown ⇒ Object
39 40 41 42 43 44 |
# File 'lib/fluent/plugin/in_sqs.rb', line 39 def shutdown super @finished = true @thread.join end |
#start ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/fluent/plugin/in_sqs.rb', line 25 def start super AWS.config( :access_key_id => @aws_key_id, :secret_access_key => @aws_sec_key ) @queue = AWS::SQS.new(:sqs_endpoint => @sqs_endpoint).queues[@sqs_url] @finished = false @thread = Thread.new(&method(:run_periodic)) end |