Module: HybridPlatformsConductor::ParallelThreads
- Included in:
- HpcPlugins::Cmdb::HostIp, HpcPlugins::Cmdb::HostKeys, NodesHandler, ServicesHandler, TestsRunner
- Defined in:
- lib/hybrid_platforms_conductor/parallel_threads.rb
Overview
Provide utilities to handle parallel threads
Instance Method Summary collapse
-
#for_each_element_in(list, parallel: false, nbr_threads_max: nil, progress: 'Process') ⇒ Object
Iterate over a list of objects.
Instance Method Details
#for_each_element_in(list, parallel: false, nbr_threads_max: nil, progress: 'Process') ⇒ Object
Iterate over a list of objects. Provide a mechanism to multithread this iteration (in such case the iterating code has to be thread-safe). In case of multithreaded run, a progress bar is being displayed.
- Parameters
-
list (Array<Object>): List of objects to iterate over
-
parallel (Boolean): Iterate in a multithreaded way? [default: false]
-
nbr_threads_max (Integer or nil): Maximum number of threads to be used in case of parallel, or nil for no limit [default: nil]
-
progress (String or nil): Name of a progress bar to follow the progression, or nil for no progress bar [default: ‘Progress’]
-
Proc: The code called for each node being iterated on.
- Parameters
-
element (Object): The object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/hybrid_platforms_conductor/parallel_threads.rb', line 20 def for_each_element_in(list, parallel: false, nbr_threads_max: nil, progress: 'Process') if parallel # Threads to wait for threads_to_join = [] # Spread elements evenly among the threads. # Use a shared pool of elements to be handled by threads. pools = { to_process: list.dup, processing: [], processed: [] } nbr_total = list.size # Protect access to the pools using a mutex pools_semaphore = Mutex.new # Spawn the threads, each one responsible for handling its list (nbr_threads_max.nil? || nbr_threads_max > nbr_total ? nbr_total : nbr_threads_max).times do threads_to_join << Thread.new do # As exceptions are logged anyway whatever the Thread setting is, only turn it on for debug. # That will keep tests execution cleaner. Thread.current.report_on_exception = log_debug? begin loop do # Modify the list while processing it, so that reporting can be done. element = nil pools_semaphore.synchronize do element = pools[:to_process].shift pools[:processing] << element unless element.nil? end break if element.nil? begin yield element ensure pools_semaphore.synchronize do pools[:processing].delete(element) pools[:processed] << element end end end rescue log_error "Unhandled exception occurred in thread #{Thread.current.object_id}: #{$!}\n#{$!.backtrace.join("\n")}" raise end end end if progress # Here the main thread just reports progression nbr_to_process = nil nbr_processing = nil nbr_processed = nil (nbr_total, name: progress) do || loop do pools_semaphore.synchronize do nbr_to_process = pools[:to_process].size nbr_processing = pools[:processing].size nbr_processed = pools[:processed].size end .title = "Queue: #{nbr_to_process} - Processing: #{nbr_processing} - Done: #{nbr_processed} - Total: #{nbr_total}" .progress = nbr_processed break if nbr_processed == nbr_total sleep 0.5 end end end # Wait for threads to be joined threads_to_join.each do |thread| thread.join end else # Execute synchronously list.each do |element| yield element end end end |