Class: Aws::ActiveJob::SQS::Executor

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/active_job/sqs/executor.rb

Overview

CLI runner for polling for SQS ActiveJobs

Constant Summary collapse

DEFAULTS =
{
  min_threads: 0,
  max_threads: Integer(Concurrent.available_processor_count || Concurrent.processor_count),
  auto_terminate: true,
  idletime: 60, # 1 minute
  fallback_policy: :abort # Concurrent::RejectedExecutionError must be handled
}.freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Executor

Returns a new instance of Executor.



32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/aws/active_job/sqs/executor.rb', line 32

def initialize(options = {})
  @executor = Concurrent::ThreadPoolExecutor.new(DEFAULTS.merge(options))
  @logger = options[:logger] || ActiveSupport::Logger.new($stdout)
  @task_complete = Concurrent::Event.new
  @post_mutex = Mutex.new

  @error_handler = options[:error_handler]
  @error_queue = Thread::Queue.new
  @error_handler_thread = Thread.new(&method(:handle_errors))
  @error_handler_thread.abort_on_exception = true
  @error_handler_thread.report_on_exception = false
  @shutting_down = Concurrent::AtomicBoolean.new(false)
end

Class Method Details

.clear_hooksObject



27
28
29
# File 'lib/aws/active_job/sqs/executor.rb', line 27

def clear_hooks
  @lifecycle_hooks = nil
end

.lifecycle_hooksObject



23
24
25
# File 'lib/aws/active_job/sqs/executor.rb', line 23

def lifecycle_hooks
  @lifecycle_hooks ||= Hash.new { |h, k| h[k] = [] }
end

.on_stop(&block) ⇒ Object



19
20
21
# File 'lib/aws/active_job/sqs/executor.rb', line 19

def on_stop(&block)
  lifecycle_hooks[:stop] << block
end

Instance Method Details

#execute(message) ⇒ Object



46
47
48
49
50
# File 'lib/aws/active_job/sqs/executor.rb', line 46

def execute(message)
  @post_mutex.synchronize do
    _execute(message)
  end
end

#shutdown(timeout = nil) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/aws/active_job/sqs/executor.rb', line 52

def shutdown(timeout = nil)
  @shutting_down.make_true

  run_hooks_for(:stop)
  @executor.shutdown
  clean_shutdown = @executor.wait_for_termination(timeout)
  if clean_shutdown
    @logger.info 'Clean shutdown complete.  All executing jobs finished.'
  else
    @logger.info "Timeout (#{timeout}) exceeded.  Some jobs may not have " \
                 'finished cleanly.  Unfinished jobs will not be removed from ' \
                 'the queue and can be ru-run once their visibility timeout ' \
                 'passes.'
  end
  @error_queue.push(nil) # process any remaining errors and then terminate
  @error_handler_thread.join unless @error_handler_thread == Thread.current
  @shutting_down.make_false
end