Class: Aws::ActiveJob::SQS::Executor
- Inherits:
-
Object
- Object
- Aws::ActiveJob::SQS::Executor
- Defined in:
- lib/aws/active_job/sqs/executor.rb
Overview
CLI runner for polling for SQS ActiveJobs
Constant Summary collapse
- DEFAULTS =
{ min_threads: 0, max_threads: Integer(Concurrent.available_processor_count || Concurrent.processor_count), auto_terminate: true, idletime: 60, # 1 minute fallback_policy: :abort # Concurrent::RejectedExecutionError must be handled }.freeze
Instance Method Summary collapse
- #execute(message) ⇒ Object
-
#initialize(options = {}) ⇒ Executor
constructor
A new instance of Executor.
- #shutdown(timeout = nil) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Executor
Returns a new instance of Executor.
18 19 20 21 22 23 |
# File 'lib/aws/active_job/sqs/executor.rb', line 18 def initialize( = {}) @executor = Concurrent::ThreadPoolExecutor.new(DEFAULTS.merge()) @retry_standard_errors = [:retry_standard_errors] @logger = [:logger] || ActiveSupport::Logger.new($stdout) @task_complete = Concurrent::Event.new end |
Instance Method Details
#execute(message) ⇒ Object
25 26 27 28 29 30 31 32 |
# File 'lib/aws/active_job/sqs/executor.rb', line 25 def execute() post_task() rescue Concurrent::RejectedExecutionError # no capacity, wait for a task to complete @task_complete.reset @task_complete.wait retry end |
#shutdown(timeout = nil) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/aws/active_job/sqs/executor.rb', line 34 def shutdown(timeout = nil) @executor.shutdown clean_shutdown = @executor.wait_for_termination(timeout) if clean_shutdown @logger.info 'Clean shutdown complete. All executing jobs finished.' else @logger.info "Timeout (#{timeout}) exceeded. Some jobs may not have " \ 'finished cleanly. Unfinished jobs will not be removed from ' \ 'the queue and can be ru-run once their visibility timeout ' \ 'passes.' end end |