Class: RJR::ThreadPool

Inherits:
Object show all
Defined in:
lib/rjr/util/thread_pool.rb

Overview

Utility to launches a specified number of threads on instantiation, assigning work to them in order as it arrives.

Supports optional timeout which allows the user to kill and restart threads if a job is taking too long to run.

Config options (must be set before first node is instantiated) collapse

Instance Method Summary collapse

Class Attribute Details

.num_threadsObject

Number of threads to instantiate in local worker pool



88
89
90
# File 'lib/rjr/util/thread_pool.rb', line 88

def num_threads
  @num_threads
end

.timeoutObject

Timeout after which worker threads are killed



91
92
93
# File 'lib/rjr/util/thread_pool.rb', line 91

def timeout
  @timeout
end

Instance Method Details

#<<(work) ⇒ Object

Add work to the pool

Parameters:

  • work (ThreadPoolJob)

    job to execute in first available thread

Returns:

  • self



206
207
208
209
210
# File 'lib/rjr/util/thread_pool.rb', line 206

def <<(work)
  # TODO option to increase worker threads if work queue gets saturated
  @work_queue.push work
  self
end

#joinObject

Block until all worker threads have finished executing

Returns:

  • self



231
232
233
234
# File 'lib/rjr/util/thread_pool.rb', line 231

def join
  @manager_thread.join if @manager_thread
  self
end

#running?Boolean

Return boolean indicating if thread pool is running

Returns:

  • (Boolean)


198
199
200
201
# File 'lib/rjr/util/thread_pool.rb', line 198

def running?
  !@manager_thread.nil? &&
  ['sleep', 'run'].include?(@manager_thread.status)
end

#startObject

Start the thread pool



189
190
191
192
193
194
195
# File 'lib/rjr/util/thread_pool.rb', line 189

def start
  return self unless @terminate
  @terminate = false
  0.upto(@num_threads-1) { |i| launch_worker }
  launch_manager
  self
end

#stopObject

Terminate the thread pool, stopping all worker threads

Returns:

  • self



215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/rjr/util/thread_pool.rb', line 215

def stop
  @terminate = true

  # this will wake up on it's own, but we can
  # speed things up if we manually wake it up,
  # surround w/ block incase thread cleans up on its own
  begin
    @manager_thread.wakeup if @manager_thread
  rescue
  end
  self
end