Class: Traject::ThreadPool
- Inherits:
-
Object
- Object
- Traject::ThreadPool
- Defined in:
- lib/traject/thread_pool.rb
Overview
An abstraction wrapping a Concurrent::ThreadPool in some configuration choices and other apparatus. Concurrent::ThreadPool is a Java ThreadPool executor on jruby for performance, and is ruby-concurrent's own ruby implementation otherwise.
1) Initialize with chosen pool size -- we create fixed size pools, where core and max sizes are the same.
2) If initialized with nil or 0 for threadcount, no thread pool will actually be created, and work sent to the Traject::ThreadPool will just be executed in the caller thread. We call this a nil threadpool. One situation it can be useful is if you are running under MRI, where multi-core parallelism isn't available, so an actual threadpool may not be useful. (Although in some cases a thread pool, especially one with size 1, can be useful in MRI for I/O blocking operations)
3) Use the #maybe_in_threadpool method to send blocks to thread pool for execution -- if configurred with a nil threadcount, your block will just be executed in calling thread. Be careful to not refer to any non-local variables in the block, unless the variable has an object you can use thread-safely!
4) We configure our underlying Concurrent::ThreadPool with a work queue that will buffer up to (pool_size*3) tasks. If the queue is full, the underlying Concurrent::ThreadPool is set up to use the :caller_runs policy meaning the block will end up executing in caller's own thread. With the kind of work we're doing, where each unit of work is small and there are many of them-- the :caller_runs policy serves as an effective 'back pressure' mechanism to keep the work queue from getting too large and exhausting memory, when producers are faster than consumers.
5) Any exceptions raised by pool-executed work are captured accumulated in a thread-safe manner, and can be re-raised in the thread of your choice by calling #raise_collected_exception!
6) When you are done with the threadpool, you can and must call #shutdown_and_wait, which will wait for all current queued work to complete, then return. You can not give any more work to the pool after you do this. By default it'll wait pretty much forever, which should be fine. If you never call shutdown, then queued or in-progress work may be abandoned when the program ends, which would be bad.
7) We will keep track of total times a block is run in thread pool, and total elapsed (wall) time of running all blocks, so an average_execution_ms time can be given. #average_execution_ms may be inaccurate if called when threads are still executing, as it's not entirely thread safe (may get an off by one as to total iterations)
Constant Summary collapse
- @@disable_concurrency =
false
Instance Attribute Summary collapse
-
#pool_size ⇒ Object
readonly
Returns the value of attribute pool_size.
-
#queue_capacity ⇒ Object
readonly
Returns the value of attribute queue_capacity.
Class Method Summary collapse
- .concurrency_disabled? ⇒ Boolean
-
.disable_concurrency! ⇒ Object
Calling Traject::ThreadPool.disable_concurrency! permanently and irrevocably (for program execution) forces all ThreadPools to have a pool_size of 0 -- running all work inline -- so should disable all use of threads in Traject.
Instance Method Summary collapse
-
#collect_exception(e) ⇒ Object
thread-safe way of storing an exception, to raise later in a different thread.
-
#initialize(pool_size) ⇒ ThreadPool
constructor
First arg is pool size, 0 or nil and we'll be a null/no-op pool which executes work in caller thread.
-
#maybe_in_thread_pool(*args) ⇒ Object
Pass it a block, MAYBE gets executed in the bg in a thread pool.
-
#raise_collected_exception! ⇒ Object
If there's a stored collected exception, raise it again now.
-
#shutdown_and_wait ⇒ Object
shutdown threadpool, and wait for all work to complete.
Constructor Details
#initialize(pool_size) ⇒ ThreadPool
First arg is pool size, 0 or nil and we'll be a null/no-op pool which executes work in caller thread.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/traject/thread_pool.rb', line 63 def initialize(pool_size) @thread_pool = nil # assume we don't have one @exceptions_caught_queue = [] # start off without exceptions if self.class.concurrency_disabled? pool_size = 0 end unless pool_size.nil? || pool_size == 0 @pool_size = pool_size.to_i @queue_capacity = pool_size * 3 @thread_pool = Concurrent::ThreadPoolExecutor.new( :min_threads => @pool_size, :max_threads => @pool_size, :max_queue => @queue_capacity, :fallback_policy => :caller_runs ) # A thread-safe queue to collect exceptions cross-threads. # We really only need to save the first exception, but a queue # is a convenient way to store a value concurrency-safely, and # might as well store all of them. @exceptions_caught_queue = Queue.new end end |
Instance Attribute Details
#pool_size ⇒ Object (readonly)
Returns the value of attribute pool_size.
51 52 53 |
# File 'lib/traject/thread_pool.rb', line 51 def pool_size @pool_size end |
#queue_capacity ⇒ Object (readonly)
Returns the value of attribute queue_capacity.
51 52 53 |
# File 'lib/traject/thread_pool.rb', line 51 def queue_capacity @queue_capacity end |
Class Method Details
.concurrency_disabled? ⇒ Boolean
59 |
# File 'lib/traject/thread_pool.rb', line 59 def self.concurrency_disabled? ; @@disable_concurrency ; end |
.disable_concurrency! ⇒ Object
Calling Traject::ThreadPool.disable_concurrency! permanently and irrevocably (for program execution) forces all ThreadPools to have a pool_size of 0 -- running all work inline -- so should disable all use of threads in Traject.
58 |
# File 'lib/traject/thread_pool.rb', line 58 def self.disable_concurrency! ; @@disable_concurrency = true ; end |
Instance Method Details
#collect_exception(e) ⇒ Object
thread-safe way of storing an exception, to raise later in a different thread. We don't guarantee that we can store more than one at a time, only the first one recorded may be stored.
139 140 141 |
# File 'lib/traject/thread_pool.rb', line 139 def collect_exception(e) @exceptions_caught_queue.push(e) end |
#maybe_in_thread_pool(*args) ⇒ Object
Pass it a block, MAYBE gets executed in the bg in a thread pool. Maybe gets executed in the calling thread.
There are actually two 'maybes':
If Traject::ThreadPool was configured with null thread pool, then ALL work will be executed in calling thread.
If there is a thread pool, but it's work queue is full, then a job will be executed in calling thread (because we configured our java thread pool with a limited sized queue, and CallerRunsPolicy rejection strategy)
You can pass arbitrary arguments to the method, that will then be passed to your block -- similar to how ruby Thread.new works. This is convenient for creating variables unique to the block that won't be shared outside the thread:
thread_pool.maybe_in_thread_pool(x, y) do |x1, y1|
100.times do
something_with(x1)
end
end
x = "someting else"
# If we hadn't passed args with block, and had just
# used x in the block, it'd be the SAME x as this one,
# and would be pointing to a different string now!
Note, that just makes block-local variables, it doesn't help you with whether a data structure itself is thread safe.
119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/traject/thread_pool.rb', line 119 def maybe_in_thread_pool(*args) if @thread_pool @thread_pool.post do begin yield(*args) rescue Exception => e collect_exception(e) end end else yield(*args) end end |
#raise_collected_exception! ⇒ Object
If there's a stored collected exception, raise it again now. Call this to re-raise exceptions caught in other threads in the thread of your choice.
If you call this method on a ThreadPool initialized with nil as a non-functioning threadpool -- then this method is just a no-op.
150 151 152 153 154 155 |
# File 'lib/traject/thread_pool.rb', line 150 def raise_collected_exception! unless @exceptions_caught_queue.empty? e = @exceptions_caught_queue.pop raise e end end |
#shutdown_and_wait ⇒ Object
shutdown threadpool, and wait for all work to complete. this one is also a no-op if you have a null ThreadPool that doesn't really have a threadpool at all.
returns elapsed time in seconds it took to shutdown
162 163 164 165 166 167 168 169 170 171 |
# File 'lib/traject/thread_pool.rb', line 162 def shutdown_and_wait start_t = Time.now if @thread_pool @thread_pool.shutdown @thread_pool.wait_for_termination end return (Time.now - start_t) end |