Class: Fluent::Plugin::SQSInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_sqs.rb

Instance Method Summary collapse

Instance Method Details

#clientObject



33
34
35
36
37
38
39
40
# File 'lib/fluent/plugin/in_sqs.rb', line 33

def client
  @client ||= Aws::SQS::Client.new(
    access_key_id: @aws_key_id,
    secret_access_key: @aws_sec_key,
    region: @region,
    stub_responses: @stub_responses
  )
end

#configure(conf) ⇒ Object



23
24
25
# File 'lib/fluent/plugin/in_sqs.rb', line 23

def configure(conf)
  super
end

#queueObject



42
43
44
45
46
47
# File 'lib/fluent/plugin/in_sqs.rb', line 42

def queue
  @queue ||= Aws::SQS::Queue.new(
    url: client.get_queue_url(queue_name: @queue_name, queue_owner_aws_account_id: @queue_owner_aws_account_id).queue_url,
    client: client
  )
end

#runObject



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/fluent/plugin/in_sqs.rb', line 53

def run
  queue.receive_messages(
    max_number_of_messages: @max_number_of_messages,
    wait_time_seconds: @wait_time_seconds,
    visibility_timeout: @visibility_timeout
  ).each do |message|
    record = parse_message(message)

    message.delete if @delete_message

    router.emit(@tag, Fluent::Engine.now, record)
  end
rescue
  log.error 'failed to emit or receive', error: $ERROR_INFO.to_s, error_class: $ERROR_INFO.class.to_s
  log.warn_backtrace $ERROR_INFO.backtrace
end

#shutdownObject



49
50
51
# File 'lib/fluent/plugin/in_sqs.rb', line 49

def shutdown
  super
end

#startObject



27
28
29
30
31
# File 'lib/fluent/plugin/in_sqs.rb', line 27

def start
  super

  timer_execute(:in_sqs_run_periodic_timer, @receive_interval, &method(:run))
end