Class: Concurrent::JavaThreadPoolExecutor

Inherits:
Object
  • Object
show all
Includes:
JavaExecutor
Defined in:
lib/concurrent/executor/java_thread_pool_executor.rb

Overview

Note:

When running on the JVM (JRuby) this class will inherit from ‘JavaThreadPoolExecutor`. On all other platforms it will inherit from `RubyThreadPoolExecutor`.

An abstraction composed of one or more threads and a task queue. Tasks (blocks or ‘proc` objects) are submit to the pool and added to the queue. The threads in the pool remove the tasks and execute them in the order they were received. When there are more tasks queued than there are threads to execute them the pool will create new threads, up to the configured maximum. Similarly, threads that are idle for too long will be garbage collected, down to the configured minimum options. Should a thread crash it, too, will be garbage collected.

‘ThreadPoolExecutor` is based on the Java class of the same name. From the official Java documentationa;

> Thread pools address two different problems: they usually provide > improved performance when executing large numbers of asynchronous tasks, > due to reduced per-task invocation overhead, and they provide a means > of bounding and managing the resources, including threads, consumed > when executing a collection of tasks. Each ThreadPoolExecutor also > maintains some basic statistics, such as the number of completed tasks. > > To be useful across a wide range of contexts, this class provides many > adjustable parameters and extensibility hooks. However, programmers are > urged to use the more convenient Executors factory methods > [CachedThreadPool] (unbounded thread pool, with automatic thread reclamation), > [FixedThreadPool] (fixed size thread pool) and [SingleThreadExecutor] (single > background thread), that preconfigure settings for the most common usage > scenarios.

Thread pools support several configuration options:

  • ‘max_threads`: The maximum number of threads that may be created in the pool.

  • ‘min_threads`: The minimum number of threads that may be retained in the pool.

  • ‘idletime`: The number of seconds that a thread may be idle before being reclaimed.

  • ‘max_queue`: The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches `max_queue` subsequent tasks will be rejected in accordance with the configured `overflow_policy`.

  • ‘overflow_policy`: The policy defining how rejected tasks are handled. #

Three overflow policies are supported:

  • ‘:abort`: Raise a `RejectedExecutionError` exception and discard the task.

  • ‘:discard`: Silently discard the task and return `nil` as the task result.

  • ‘:caller_runs`: Execute the task on the calling thread.

Direct Known Subclasses

JavaCachedThreadPool, JavaFixedThreadPool

Constant Summary collapse

DEFAULT_MAX_POOL_SIZE =

Default maximum number of threads that will be created in the pool.

java.lang.Integer::MAX_VALUE
DEFAULT_MIN_POOL_SIZE =

Default minimum number of threads that will be retained in the pool.

0
DEFAULT_MAX_QUEUE_SIZE =

Default maximum number of tasks that may be added to the task queue.

0
DEFAULT_THREAD_IDLETIMEOUT =

Default maximum number of seconds a thread in the pool may remain idle before being reclaimed.

60
OVERFLOW_POLICIES =

The set of possible overflow policies that may be set at thread pool creation.

{
  abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy,
  discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy,
  caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from JavaExecutor

#<<, #kill, #post, #shutdown?, #shuttingdown?, #wait_for_termination

Constructor Details

#initialize(opts = {}) ⇒ JavaThreadPoolExecutor

Create a new thread pool.

Parameters:

  • (defaults to: {})

    the options which configure the thread pool

Options Hash (opts):

  • :max_threads (Integer) — default: DEFAULT_MAX_POOL_SIZE

    the maximum number of threads to be created

  • :min_threads (Integer) — default: DEFAULT_MIN_POOL_SIZE

    the minimum number of threads to be retained

  • :idletime (Integer) — default: DEFAULT_THREAD_IDLETIMEOUT

    the maximum number of seconds a thread may be idle before being reclaimed

  • :max_queue (Integer) — default: DEFAULT_MAX_QUEUE_SIZE

    the maximum number of tasks allowed in the work queue at any one time; a value of zero means the queue may grow without bounnd

  • :overflow_policy (Symbol) — default: :abort

    the policy for handling new tasks that are received when the queue size has reached ‘max_queue`

Raises:

  • if ‘:max_threads` is less than one

  • if ‘:min_threads` is less than zero

  • if ‘:overflow_policy` is not one of the values specified in `OVERFLOW_POLICIES`

See Also:



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 65

def initialize(opts = {})
  min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
  max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
  idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
  @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
  @overflow_policy = opts.fetch(:overflow_policy, :abort)

  raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
  raise ArgumentError.new('min_threads cannot be less than zero') if min_length < 0
  raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)

  if min_length == 0 && @max_queue == 0
    queue = java.util.concurrent.SynchronousQueue.new
  elsif @max_queue == 0
    queue = java.util.concurrent.LinkedBlockingQueue.new
  else
    queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue)
  end

  @executor = java.util.concurrent.ThreadPoolExecutor.new(
    min_length, max_length,
    idletime, java.util.concurrent.TimeUnit::SECONDS,
    queue, OVERFLOW_POLICIES[@overflow_policy].new)

  set_shutdown_hook
end

Instance Attribute Details

#max_lengthInteger (readonly)

The maximum number of threads that may be created in the pool.

Returns:

  • the max_length



31
32
33
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 31

def max_length
  @max_length
end

#max_queueObject (readonly)

The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches ‘max_queue` subsequent tasks will be rejected in accordance with the configured `overflow_policy`.



36
37
38
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 36

def max_queue
  @max_queue
end

#overflow_policyObject (readonly)

The policy defining how rejected tasks (tasks received once the queue size reaches the configured ‘max_queue`) are handled. Must be one of the values specified in `OVERFLOW_POLICIES`.



41
42
43
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 41

def overflow_policy
  @overflow_policy
end

Instance Method Details

#can_overflow?Boolean

Returns:



92
93
94
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 92

def can_overflow?
  @max_queue != 0
end

#completed_task_countInteger

The number of tasks that have been completed by the pool since construction.

Returns:

  • the completed_task_count



135
136
137
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 135

def completed_task_count
  @executor.getCompletedTaskCount
end

#idletimeInteger

The number of seconds that a thread may be idle before being reclaimed.

Returns:

  • the idletime



142
143
144
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 142

def idletime
  @executor.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS)
end

#largest_lengthInteger

The largest number of threads that have been created in the pool since construction.

Returns:

  • the largest_length



121
122
123
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 121

def largest_length
  @executor.getLargestPoolSize
end

#lengthInteger Also known as: current_length

The number of threads currently in the pool.

Returns:

  • the length



113
114
115
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 113

def length
  @executor.getPoolSize
end

#min_lengthInteger

The minimum number of threads that may be retained in the pool.

Returns:

  • the min_length



99
100
101
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 99

def min_length
  @executor.getCorePoolSize
end

#queue_lengthInteger

The number of tasks in the queue awaiting execution.

Returns:

  • the queue_length



149
150
151
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 149

def queue_length
  @executor.getQueue.size
end

#remaining_capacityInteger

Number of tasks that may be enqueued before reaching ‘max_queue` and rejecting new tasks. A value of -1 indicates that the queue may grow without bound.

Returns:

  • the remaining_capacity



157
158
159
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 157

def remaining_capacity
  @max_queue == 0 ? -1 : @executor.getQueue.remainingCapacity
end

#running?Boolean

Is the thread pool running?

Returns:

  • ‘true` when running, `false` when shutting down or shutdown



173
174
175
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 173

def running?
  super && ! @executor.isTerminating
end

#scheduled_task_countInteger

The number of tasks that have been scheduled for execution on the pool since construction.

Returns:

  • the scheduled_task_count



128
129
130
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 128

def scheduled_task_count
  @executor.getTaskCount
end

#shutdownObject

Begin an orderly shutdown. Tasks already in the queue will be executed, but no new tasks will be accepted. Has no additional effect if the thread pool is not running.



180
181
182
183
184
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 180

def shutdown
  super
  @executor.getQueue.clear
  nil
end

#statusObject

This method is deprecated and will be removed soon. This method is supost to return the threads status, but Java API doesn’t provide a way to get the thread status. So we return an empty Array instead.



164
165
166
167
168
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 164

def status
  warn '[DEPRECATED] `status` is deprecated and will be removed soon.'
  warn "Calls to `status` return an empty Array. Java ThreadPoolExecutor does not provide thread's status."
  []
end