Class: JobRunner
Overview
Runs many jobs in parallel, and returns their interleaved results. (NOTE: The JobRunner can be run multiple times; each time the blocks
will be executed again.)
Examples:
JobRunner.new do |jr|
jr.add { 3 }
jr.add { sleep 0.1; 2 }
jr.add { sleep 0.2; 1 }
jr.each_result do |result|
p result
end
end
jr = JobRunner.new(
proc { 1 },
proc { 2 },
proc { 3 }
)
2.times do
jr.each_result { |result| p result }
end
Instance Method Summary collapse
- #add(&block) ⇒ Object
- #dmsg(msg) ⇒ Object
- #each_result ⇒ Object
- #go! ⇒ Object
-
#initialize(*blocks, debug: false) ⇒ JobRunner
constructor
A new instance of JobRunner.
- #reap! ⇒ Object
Constructor Details
#initialize(*blocks, debug: false) ⇒ JobRunner
Returns a new instance of JobRunner.
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/epitools/job_runner.rb', line 29 def initialize(*blocks, debug: false) @threads = [] @results = Thread::Queue.new @jobs = [] @started = false @debug = debug if blocks.any? blocks.each { |block| add &block } else yield self if block_given? end end |
Instance Method Details
#add(&block) ⇒ Object
47 48 49 50 |
# File 'lib/epitools/job_runner.rb', line 47 def add(&block) dmsg("added job #{block}") @jobs << block end |
#dmsg(msg) ⇒ Object
43 44 45 |
# File 'lib/epitools/job_runner.rb', line 43 def dmsg(msg) puts "[#{Time.now}] #{msg}" if @debug end |
#each_result ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/epitools/job_runner.rb', line 78 def each_result go! unless @started loop do yield @results.pop reap! break if @threads.empty? and @results.empty? end @started = false end |
#go! ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/epitools/job_runner.rb', line 61 def go! if @started raise "Error: already started" else dmsg("starting #{@threads.size} jobs") end @started = true @jobs.each do |job| dmsg("adding #{job}") @threads << Thread.new do @results << job.call dmsg("job #{job} complete") end end end |
#reap! ⇒ Object
52 53 54 55 56 57 58 59 |
# File 'lib/epitools/job_runner.rb', line 52 def reap! if @threads.any? dmsg("reaping #{@threads.size} threads") @threads.delete_if { |t| not t.alive? } else dmsg("reap failed: no threads") end end |