Class: Exekutor::Internal::Executor

Inherits:
Object
  • Object
show all
Includes:
Callbacks, Executable, Logger
Defined in:
lib/exekutor/internal/executor.rb

Overview

Executes jobs from a thread pool

Defined Under Namespace

Classes: JobExecutionTimeout, ThreadPoolExecutor

Constant Summary

Constants included from Executable

Exekutor::Internal::Executable::STATES

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Executable

#consecutive_errors, #restart_delay, #running?, #state

Methods included from Callbacks

#add_callback

Constructor Details

#initialize(min_threads: 1, max_threads: default_max_threads, max_thread_idletime: 180, delete_completed_jobs: false, delete_discarded_jobs: false, delete_failed_jobs: false) ⇒ Executor

Create a new executor

Parameters:

  • min_threads (Integer) (defaults to: 1)

    the minimum number of threads that should be active

  • max_threads (Integer) (defaults to: default_max_threads)

    the maximum number of threads that may be active

  • max_thread_idletime (Integer) (defaults to: 180)

    the amount of seconds a thread may be idle before being reclaimed

  • delete_completed_jobs (Boolean) (defaults to: false)

    whether to delete jobs that complete successfully

  • delete_discarded_jobs (Boolean) (defaults to: false)

    whether to delete jobs that are discarded

  • delete_failed_jobs (Boolean) (defaults to: false)

    whether to delete jobs that fail



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/exekutor/internal/executor.rb', line 27

def initialize(min_threads: 1, max_threads: default_max_threads, max_thread_idletime: 180,
               delete_completed_jobs: false, delete_discarded_jobs: false, delete_failed_jobs: false)
  super()
  @executor = ThreadPoolExecutor.new name: "exekutor-job", fallback_policy: :abort, max_queue: max_threads,
                                     min_threads: min_threads, max_threads: max_threads,
                                     idletime: max_thread_idletime
  @queued_job_ids = Concurrent::Array.new
  @active_job_ids = Concurrent::Array.new
  @pending_job_updates = Concurrent::Hash.new
  @options = {
    delete_completed_jobs: delete_completed_jobs,
    delete_discarded_jobs: delete_discarded_jobs,
    delete_failed_jobs: delete_failed_jobs
  }.freeze
end

Instance Attribute Details

#pending_job_updatesObject (readonly)

Returns the value of attribute pending_job_updates.



16
17
18
# File 'lib/exekutor/internal/executor.rb', line 16

def pending_job_updates
  @pending_job_updates
end

Instance Method Details

#active_job_idsArray<String>

Returns The ids of the jobs that are currently being executed.

Returns:

  • (Array<String>)

    The ids of the jobs that are currently being executed



95
96
97
# File 'lib/exekutor/internal/executor.rb', line 95

def active_job_ids
  @active_job_ids.dup.to_a
end

#available_threadsInteger

Returns the number of available threads to execute jobs on. Returns 0 if the executor is not running.

Returns:

  • (Integer)

    the number of available threads to execute jobs on. Returns 0 if the executor is not running.



76
77
78
79
80
81
82
# File 'lib/exekutor/internal/executor.rb', line 76

def available_threads
  if @executor.running?
    @executor.available_threads
  else
    0
  end
end

#killObject

Kills the executor



58
59
60
61
62
63
# File 'lib/exekutor/internal/executor.rb', line 58

def kill
  Thread.new { compare_and_set_state :started, :killed }
  @executor.kill

  release_assigned_jobs
end

#maximum_threadsInteger

Returns the maximum number of threads to execute jobs on.

Returns:

  • (Integer)

    the maximum number of threads to execute jobs on.



90
91
92
# File 'lib/exekutor/internal/executor.rb', line 90

def maximum_threads
  @executor.max_length
end

#minimum_threadsInteger

Returns the minimum number of threads to execute jobs on.

Returns:

  • (Integer)

    the minimum number of threads to execute jobs on.



85
86
87
# File 'lib/exekutor/internal/executor.rb', line 85

def minimum_threads
  @executor.min_length
end

#post(job) ⇒ Object

Executes the job on one of the execution threads. Releases the job if there is no thread available to execute the job.



67
68
69
70
71
72
73
# File 'lib/exekutor/internal/executor.rb', line 67

def post(job)
  @executor.post(job) { |*args| execute(*args) }
  @queued_job_ids.append(job[:id])
rescue Concurrent::RejectedExecutionError
  logger.error "Ran out of threads! Releasing job #{job[:id]}"
  update_job job, status: "p", worker_id: nil
end

#prune_poolObject

Prunes the inactive threads from the pool.



100
101
102
# File 'lib/exekutor/internal/executor.rb', line 100

def prune_pool
  @executor.prune_pool
end

#startObject

Starts the executor



46
47
48
# File 'lib/exekutor/internal/executor.rb', line 46

def start
  self.state = :started
end

#stopObject

Stops the executor



51
52
53
54
55
# File 'lib/exekutor/internal/executor.rb', line 51

def stop
  self.state = :stopped

  @executor.shutdown
end