Class: Exekutor::Internal::Executor
- Inherits:
-
Object
- Object
- Exekutor::Internal::Executor
- Includes:
- Callbacks, Executable, Logger
- Defined in:
- lib/exekutor/internal/executor.rb
Overview
Executes jobs from a thread pool
Defined Under Namespace
Classes: JobExecutionTimeout, ThreadPoolExecutor
Constant Summary
Constants included from Executable
Exekutor::Internal::Executable::STATES
Instance Attribute Summary collapse
-
#pending_job_updates ⇒ Object
readonly
Returns the value of attribute pending_job_updates.
Instance Method Summary collapse
-
#active_job_ids ⇒ Array<String>
The ids of the jobs that are currently being executed.
-
#available_threads ⇒ Integer
The number of available threads to execute jobs on.
-
#initialize(min_threads: 1, max_threads: default_max_threads, max_thread_idletime: 180, delete_completed_jobs: false, delete_discarded_jobs: false, delete_failed_jobs: false) ⇒ Executor
constructor
Create a new executor.
-
#kill ⇒ Object
Kills the executor.
-
#maximum_threads ⇒ Integer
The maximum number of threads to execute jobs on.
-
#minimum_threads ⇒ Integer
The minimum number of threads to execute jobs on.
-
#post(job) ⇒ Object
Executes the job on one of the execution threads.
-
#prune_pool ⇒ Object
Prunes the inactive threads from the pool.
-
#start ⇒ Object
Starts the executor.
-
#stop ⇒ Object
Stops the executor.
Methods included from Executable
#consecutive_errors, #restart_delay, #running?, #state
Methods included from Callbacks
Constructor Details
#initialize(min_threads: 1, max_threads: default_max_threads, max_thread_idletime: 180, delete_completed_jobs: false, delete_discarded_jobs: false, delete_failed_jobs: false) ⇒ Executor
Create a new executor
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/exekutor/internal/executor.rb', line 27 def initialize(min_threads: 1, max_threads: default_max_threads, max_thread_idletime: 180, delete_completed_jobs: false, delete_discarded_jobs: false, delete_failed_jobs: false) super() @executor = ThreadPoolExecutor.new name: "exekutor-job", fallback_policy: :abort, max_queue: max_threads, min_threads: min_threads, max_threads: max_threads, idletime: max_thread_idletime @queued_job_ids = Concurrent::Array.new @active_job_ids = Concurrent::Array.new @pending_job_updates = Concurrent::Hash.new @options = { delete_completed_jobs: delete_completed_jobs, delete_discarded_jobs: delete_discarded_jobs, delete_failed_jobs: delete_failed_jobs }.freeze end |
Instance Attribute Details
#pending_job_updates ⇒ Object (readonly)
Returns the value of attribute pending_job_updates.
16 17 18 |
# File 'lib/exekutor/internal/executor.rb', line 16 def pending_job_updates @pending_job_updates end |
Instance Method Details
#active_job_ids ⇒ Array<String>
Returns The ids of the jobs that are currently being executed.
95 96 97 |
# File 'lib/exekutor/internal/executor.rb', line 95 def active_job_ids @active_job_ids.dup.to_a end |
#available_threads ⇒ Integer
Returns the number of available threads to execute jobs on. Returns 0 if the executor is not running.
76 77 78 79 80 81 82 |
# File 'lib/exekutor/internal/executor.rb', line 76 def available_threads if @executor.running? @executor.available_threads else 0 end end |
#kill ⇒ Object
Kills the executor
58 59 60 61 62 63 |
# File 'lib/exekutor/internal/executor.rb', line 58 def kill Thread.new { compare_and_set_state :started, :killed } @executor.kill release_assigned_jobs end |
#maximum_threads ⇒ Integer
Returns the maximum number of threads to execute jobs on.
90 91 92 |
# File 'lib/exekutor/internal/executor.rb', line 90 def maximum_threads @executor.max_length end |
#minimum_threads ⇒ Integer
Returns the minimum number of threads to execute jobs on.
85 86 87 |
# File 'lib/exekutor/internal/executor.rb', line 85 def minimum_threads @executor.min_length end |
#post(job) ⇒ Object
Executes the job on one of the execution threads. Releases the job if there is no thread available to execute the job.
67 68 69 70 71 72 73 |
# File 'lib/exekutor/internal/executor.rb', line 67 def post(job) @executor.post(job) { |*args| execute(*args) } @queued_job_ids.append(job[:id]) rescue Concurrent::RejectedExecutionError logger.error "Ran out of threads! Releasing job #{job[:id]}" update_job job, status: "p", worker_id: nil end |
#prune_pool ⇒ Object
Prunes the inactive threads from the pool.
100 101 102 |
# File 'lib/exekutor/internal/executor.rb', line 100 def prune_pool @executor.prune_pool end |
#start ⇒ Object
Starts the executor
46 47 48 |
# File 'lib/exekutor/internal/executor.rb', line 46 def start self.state = :started end |
#stop ⇒ Object
Stops the executor
51 52 53 54 55 |
# File 'lib/exekutor/internal/executor.rb', line 51 def stop self.state = :stopped @executor.shutdown end |