Class: Aws::ActiveJob::SQS::Executor

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/active_job/sqs/executor.rb

Overview

CLI runner for polling for SQS ActiveJobs

Constant Summary collapse

DEFAULTS =
{
  min_threads: 0,
  max_threads: Integer(Concurrent.available_processor_count || Concurrent.processor_count),
  auto_terminate: true,
  idletime: 60, # 1 minute
  fallback_policy: :abort # Concurrent::RejectedExecutionError must be handled
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Executor

Returns a new instance of Executor.



18
19
20
21
22
23
# File 'lib/aws/active_job/sqs/executor.rb', line 18

def initialize(options = {})
  @executor = Concurrent::ThreadPoolExecutor.new(DEFAULTS.merge(options))
  @retry_standard_errors = options[:retry_standard_errors]
  @logger = options[:logger] || ActiveSupport::Logger.new($stdout)
  @task_complete = Concurrent::Event.new
end

Instance Method Details

#execute(message) ⇒ Object



25
26
27
28
29
30
31
32
# File 'lib/aws/active_job/sqs/executor.rb', line 25

def execute(message)
  post_task(message)
rescue Concurrent::RejectedExecutionError
  # no capacity, wait for a task to complete
  @task_complete.reset
  @task_complete.wait
  retry
end

#shutdown(timeout = nil) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/aws/active_job/sqs/executor.rb', line 34

def shutdown(timeout = nil)
  @executor.shutdown
  clean_shutdown = @executor.wait_for_termination(timeout)
  if clean_shutdown
    @logger.info 'Clean shutdown complete.  All executing jobs finished.'
  else
    @logger.info "Timeout (#{timeout}) exceeded.  Some jobs may not have " \
                 'finished cleanly.  Unfinished jobs will not be removed from ' \
                 'the queue and can be ru-run once their visibility timeout ' \
                 'passes.'
  end
end