Class: SqsPoller
- Inherits:
-
Object
- Object
- SqsPoller
- Defined in:
- lib/logstash/inputs/sqs/poller.rb
Constant Summary collapse
- DEFAULT_OPTIONS =
queue poller options we want to set explicitly
{ # we query one message at a time, so we can ensure correct error # handling if we can't download a single file correctly # (we will throw :skip_delete if download size isn't correct to allow # for processing the event again later, so make sure to set a reasonable # "DefaultVisibilityTimeout" for your queue so that there's enough time # to process the log files!) max_number_of_messages: 1, visibility_timeout: 10, # long polling; by default we use the queue's setting. # A good value is 10 seconds to to balance between a quick logstash # shutdown and fewer api calls. wait_time_seconds: nil, #attribute_names: ["All"], # Receive all available built-in message attributes. #message_attribute_names: ["All"], # Receive any custom message attributes. skip_delete: false, }
- BACKOFF_SLEEP_TIME =
only needed in “run_with_backoff”:
1
- BACKOFF_FACTOR =
2
- MAX_TIME_BEFORE_GIVING_UP =
60
- EVENT_SOURCE =
only needed in “preprocess”:
'aws:s3'
- EVENT_TYPE =
'ObjectCreated'
Instance Method Summary collapse
-
#initialize(logger, stop_semaphore, poller_options = {}, client_options = {}, aws_options_hash) ⇒ SqsPoller
constructor
initialization and setup happens once, outside the threads:.
-
#run ⇒ Object
this is called by every worker thread:.
Constructor Details
#initialize(logger, stop_semaphore, poller_options = {}, client_options = {}, aws_options_hash) ⇒ SqsPoller
initialization and setup happens once, outside the threads:
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/logstash/inputs/sqs/poller.rb', line 38 def initialize(logger, stop_semaphore, = {}, = {}, ) @logger = logger @stopped = stop_semaphore @queue = [:sqs_queue] @from_sns = [:from_sns] @max_processing_time = [:max_processing_time] @sqs_delete_on_failure = [:sqs_delete_on_failure] @options = DEFAULT_OPTIONS.merge() begin @logger.info("Registering SQS input", :queue => @queue) sqs_client = Aws::SQS::Client.new() if uri?(@queue) queue_url = @queue else queue_url = sqs_client.get_queue_url({ queue_name: @queue, queue_owner_aws_account_id: [:queue_owner_aws_account_id] }).queue_url end @poller = Aws::SQS::QueuePoller.new(queue_url, :client => sqs_client ) @logger.info("[#{Thread.current[:name]}] connected to queue.", :queue_url => queue_url) rescue Aws::SQS::Errors::ServiceError => e @logger.error("Cannot establish connection to Amazon SQS", :error => e) raise LogStash::ConfigurationError, "Verify the SQS queue name and your credentials" end end |
Instance Method Details
#run ⇒ Object
this is called by every worker thread:
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/logstash/inputs/sqs/poller.rb', line 69 def run() # not (&block) - pass explicitly (use yield below) # per-thread timer to extend visibility if necessary extender = nil = (@options[:visibility_timeout] * 95).to_f / 100.0 new_visibility = 2 * @options[:visibility_timeout] # "shutdown handler": @poller.before_request do |_| if stop? # kill visibility extender thread if active? extender.kill if extender extender = nil @logger.warn('issuing :stop_polling on "stop?" signal', :queue => @queue) # this can take up to "Receive Message Wait Time" (of the sqs queue) seconds to be recognized throw :stop_polling end end run_with_backoff do = 0 #PROFILING @poller.poll(@options) do || += 1 #PROFILING = Process.clock_gettime(Process::CLOCK_MONOTONIC) #PROFILING # auto-increase the timeout if processing takes too long: poller_thread = Thread.current extender = Thread.new do while new_visibility < @max_processing_time do sleep begin @poller.(, new_visibility) @logger.warn("[#{Thread.current[:name]}] Extended visibility for a long running message", :visibility => new_visibility) if new_visibility > 600.0 new_visibility += rescue Aws::SQS::Errors::InvalidParameterValue => e @logger.debug("Extending visibility failed for message", :error => e) else @logger.debug("[#{Thread.current[:name]}] Extended visibility for message", :visibility => new_visibility) #PROFILING end end @logger.error("[#{Thread.current[:name]}] Maximum visibility reached! We will delete this message from queue!") @poller.() if @sqs_delete_on_failure poller_thread.kill end extender[:name] = "#{Thread.current[:name]}/extender" #PROFILING failed = false record_count = 0 begin = catch(:skip_delete) do preprocess() do |record| record_count += 1 extender[:name] = "#{Thread.current[:name]}/extender/#{record[:key]}" #PROFILING yield(record) end end rescue Exception => e @logger.warn("Error in poller loop", :error => e) @logger.warn("Backtrace:\n\t#{e.backtrace.join("\n\t")}") failed = true end = Process.clock_gettime(Process::CLOCK_MONOTONIC) #PROFILING unless @logger.debug("[#{Thread.current[:name]}] uncompleted message at the end of poller loop. We´ll throw skip_delete.", :message_count => ) extender.run if extender end # at this time the extender has either fired or is obsolete extender.kill if extender extender = nil throw :skip_delete if failed or ! #@logger.info("[#{Thread.current[:name]}] completed message.", :message => message_count) end end end |