Class: Cabriolet::Parallel::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/cabriolet/parallel.rb

Overview

Thread pool for custom parallel operations

Instance Method Summary collapse

Constructor Details

#initialize(size: Extractor::DEFAULT_WORKERS) ⇒ ThreadPool

Returns a new instance of ThreadPool.



223
224
225
226
227
228
# File 'lib/cabriolet/parallel.rb', line 223

def initialize(size: Extractor::DEFAULT_WORKERS)
  @size = size
  @queue = Queue.new
  @threads = []
  @running = false
end

Instance Method Details

#map(items) {|item| ... } ⇒ Array

Execute tasks in parallel with automatic cleanup

Parameters:

  • items (Array)

    Items to process

Yields:

  • (item)

    Process each item

Returns:

  • (Array)

    Results from each task



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/cabriolet/parallel.rb', line 273

def map(items)
  start
  results = []
  results_mutex = Mutex.new

  items.each_with_index do |item, index|
    submit do
      result = yield(item)
      results_mutex.synchronize do
        results[index] = result
      end
    end
  end

  shutdown(wait: true)
  results
end

#shutdown(wait: true) ⇒ Object

Shutdown the thread pool

Parameters:

  • wait (Boolean) (defaults to: true)

    Wait for pending tasks to complete



251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/cabriolet/parallel.rb', line 251

def shutdown(wait: true)
  return unless @running

  if wait
    # Wait for queue to empty
    sleep 0.01 until @queue.empty?
  end

  # Send termination signals
  @size.times { @queue << :shutdown }

  # Wait for threads to finish
  @threads.each(&:join)
  @threads.clear
  @running = false
end

#startObject

Start the thread pool



231
232
233
234
235
236
237
238
# File 'lib/cabriolet/parallel.rb', line 231

def start
  return if @running

  @running = true
  @threads = Array.new(@size) do
    Thread.new { worker_loop }
  end
end

#submit { ... } ⇒ Object

Submit a task to the pool

Yields:

  • Task to execute



243
244
245
246
# File 'lib/cabriolet/parallel.rb', line 243

def submit(&block)
  start unless @running
  @queue << block
end