Class: Cabriolet::Parallel::ThreadPool
- Inherits:
-
Object
- Object
- Cabriolet::Parallel::ThreadPool
- Defined in:
- lib/cabriolet/parallel.rb
Overview
Thread pool for custom parallel operations
Instance Method Summary collapse
-
#initialize(size: Extractor::DEFAULT_WORKERS) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
-
#map(items) {|item| ... } ⇒ Array
Execute tasks in parallel with automatic cleanup.
-
#shutdown(wait: true) ⇒ Object
Shutdown the thread pool.
-
#start ⇒ Object
Start the thread pool.
-
#submit { ... } ⇒ Object
Submit a task to the pool.
Constructor Details
#initialize(size: Extractor::DEFAULT_WORKERS) ⇒ ThreadPool
Returns a new instance of ThreadPool.
223 224 225 226 227 228 |
# File 'lib/cabriolet/parallel.rb', line 223 def initialize(size: Extractor::DEFAULT_WORKERS) @size = size @queue = Queue.new @threads = [] @running = false end |
Instance Method Details
#map(items) {|item| ... } ⇒ Array
Execute tasks in parallel with automatic cleanup
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'lib/cabriolet/parallel.rb', line 273 def map(items) start results = [] results_mutex = Mutex.new items.each_with_index do |item, index| submit do result = yield(item) results_mutex.synchronize do results[index] = result end end end shutdown(wait: true) results end |
#shutdown(wait: true) ⇒ Object
Shutdown the thread pool
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/cabriolet/parallel.rb', line 251 def shutdown(wait: true) return unless @running if wait # Wait for queue to empty sleep 0.01 until @queue.empty? end # Send termination signals @size.times { @queue << :shutdown } # Wait for threads to finish @threads.each(&:join) @threads.clear @running = false end |
#start ⇒ Object
Start the thread pool
231 232 233 234 235 236 237 238 |
# File 'lib/cabriolet/parallel.rb', line 231 def start return if @running @running = true @threads = Array.new(@size) do Thread.new { worker_loop } end end |
#submit { ... } ⇒ Object
Submit a task to the pool
243 244 245 246 |
# File 'lib/cabriolet/parallel.rb', line 243 def submit(&block) start unless @running @queue << block end |