Class: Aws::ActiveJob::SQS::Executor
- Inherits:
-
Object
- Object
- Aws::ActiveJob::SQS::Executor
- Defined in:
- lib/aws/active_job/sqs/executor.rb
Overview
CLI runner for polling for SQS ActiveJobs
Constant Summary collapse
- DEFAULTS =
{ min_threads: 0, max_threads: Integer(Concurrent.available_processor_count || Concurrent.processor_count), auto_terminate: true, idletime: 60, # 1 minute fallback_policy: :abort # Concurrent::RejectedExecutionError must be handled }.freeze
Class Method Summary collapse
Instance Method Summary collapse
- #execute(message) ⇒ Object
-
#initialize(options = {}) ⇒ Executor
constructor
A new instance of Executor.
- #shutdown(timeout = nil) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Executor
Returns a new instance of Executor.
32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/aws/active_job/sqs/executor.rb', line 32 def initialize( = {}) @executor = Concurrent::ThreadPoolExecutor.new(DEFAULTS.merge()) @logger = [:logger] || ActiveSupport::Logger.new($stdout) @task_complete = Concurrent::Event.new @post_mutex = Mutex.new @error_handler = [:error_handler] @error_queue = Thread::Queue.new @error_handler_thread = Thread.new(&method(:handle_errors)) @error_handler_thread.abort_on_exception = true @error_handler_thread.report_on_exception = false @shutting_down = Concurrent::AtomicBoolean.new(false) end |
Class Method Details
.clear_hooks ⇒ Object
27 28 29 |
# File 'lib/aws/active_job/sqs/executor.rb', line 27 def clear_hooks @lifecycle_hooks = nil end |
.lifecycle_hooks ⇒ Object
23 24 25 |
# File 'lib/aws/active_job/sqs/executor.rb', line 23 def lifecycle_hooks @lifecycle_hooks ||= Hash.new { |h, k| h[k] = [] } end |
.on_stop(&block) ⇒ Object
19 20 21 |
# File 'lib/aws/active_job/sqs/executor.rb', line 19 def on_stop(&block) lifecycle_hooks[:stop] << block end |
Instance Method Details
#execute(message) ⇒ Object
46 47 48 49 50 |
# File 'lib/aws/active_job/sqs/executor.rb', line 46 def execute() @post_mutex.synchronize do _execute() end end |
#shutdown(timeout = nil) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/aws/active_job/sqs/executor.rb', line 52 def shutdown(timeout = nil) @shutting_down.make_true run_hooks_for(:stop) @executor.shutdown clean_shutdown = @executor.wait_for_termination(timeout) if clean_shutdown @logger.info 'Clean shutdown complete. All executing jobs finished.' else @logger.info "Timeout (#{timeout}) exceeded. Some jobs may not have " \ 'finished cleanly. Unfinished jobs will not be removed from ' \ 'the queue and can be ru-run once their visibility timeout ' \ 'passes.' end @error_queue.push(nil) # process any remaining errors and then terminate @error_handler_thread.join unless @error_handler_thread == Thread.current @shutting_down.make_false end |