Module: Capistrano::Configuration::Extensions::Actions::Invocation
- Included in:
- Capistrano::Configuration
- Defined in:
- lib/capistrano/configuration/extensions/actions/invocation.rb
Defined Under Namespace
Classes: BlockProxy
Instance Method Summary collapse
- #parallelize(thread_count = nil) {|proxy| ... } ⇒ Object
- #rollback_all_threads(threads) ⇒ Object
- #run_in_threads(blocks) ⇒ Object
- #run_parallelize_loop(proxy, thread_count) ⇒ Object
- #wait_for(threads) ⇒ Object
Instance Method Details
#parallelize(thread_count = nil) {|proxy| ... } ⇒ Object
19 20 21 22 23 24 25 26 27 |
# File 'lib/capistrano/configuration/extensions/actions/invocation.rb', line 19 def parallelize(thread_count = nil) set :parallelize_thread_count, 10 unless respond_to?(:parallelize_thread_count) proxy = BlockProxy.new yield proxy logger.info "Running #{proxy.blocks.size} threads in chunks of #{thread_count || parallelize_thread_count}" run_parallelize_loop(proxy, thread_count || parallelize_thread_count) end |
#rollback_all_threads(threads) ⇒ Object
69 70 71 72 73 74 75 76 77 78 |
# File 'lib/capistrano/configuration/extensions/actions/invocation.rb', line 69 def rollback_all_threads(threads) Thread.new do threads.select {|t| !t[:rolled_back]}.each do |thread| Thread.current[:rollback_requests] = thread[:rollback_requests] rollback! end end.join rollback! # Rolling back main thread too true end |
#run_in_threads(blocks) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/capistrano/configuration/extensions/actions/invocation.rb', line 43 def run_in_threads(blocks) blocks.collect do |blk| thread = Thread.new do logger.info "Running block in background thread" blk.call end begin thread.run rescue ThreadError thread[:exception_raised] = $! end thread end end |
#run_parallelize_loop(proxy, thread_count) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/capistrano/configuration/extensions/actions/invocation.rb', line 29 def run_parallelize_loop(proxy, thread_count) batch = 1 all_threads = [] proxy.blocks.each_slice(thread_count) do |chunk| logger.info "Running batch number #{batch}" threads = run_in_threads(chunk) all_threads << threads wait_for(threads) rollback_all_threads(all_threads.flatten) and return if threads.any? {|t| t[:rolled_back] || t[:exception_raised]} batch += 1 end all_threads end |
#wait_for(threads) ⇒ Object
58 59 60 61 62 63 64 65 66 67 |
# File 'lib/capistrano/configuration/extensions/actions/invocation.rb', line 58 def wait_for(threads) threads.each do |thread| begin thread.join rescue logger.important "Subthread failed: #{$!.}" thread[:exception_raised] = $! end end end |