Class: Myreplicator::Parallelizer
- Inherits:
-
Object
- Object
- Myreplicator::Parallelizer
- Defined in:
- lib/transporter/parallelizer.rb
Instance Attribute Summary collapse
-
#queue ⇒ Object
Returns the value of attribute queue.
Instance Method Summary collapse
-
#done? ⇒ Boolean
Returns true when all jobs are processed and no thread is running.
-
#initialize(*args) ⇒ Parallelizer
constructor
A new instance of Parallelizer.
-
#manage_threads ⇒ Object
Clears dead threads, frees thread pool for more jobs Exits when no more threads are left.
-
#run ⇒ Object
Runs while there are jobs in the queue Waits for a second and checks for available threads Exits when all jobs are allocated in threads.
Constructor Details
#initialize(*args) ⇒ Parallelizer
Returns a new instance of Parallelizer.
16 17 18 19 20 21 22 |
# File 'lib/transporter/parallelizer.rb', line 16 def initialize *args = args. @queue = Queue.new @threads = [] @max_threads = [:max_threads].nil? ? 10 : [:max_threads] @klass = [:klass].constantize end |
Instance Attribute Details
#queue ⇒ Object
Returns the value of attribute queue.
14 15 16 |
# File 'lib/transporter/parallelizer.rb', line 14 def queue @queue end |
Instance Method Details
#done? ⇒ Boolean
Returns true when all jobs are processed and no thread is running
91 92 93 94 95 96 |
# File 'lib/transporter/parallelizer.rb', line 91 def done? if @queue.size == 0 && @threads.size == 0 return true end return false end |
#manage_threads ⇒ Object
Clears dead threads, frees thread pool for more jobs Exits when no more threads are left
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/transporter/parallelizer.rb', line 63 def manage_threads Thread.new do while(@threads.size > 0) done = [] @threads.each do |t| done << t if t[:thread_state] == "done" || !t.status # puts t.object_id.to_s + "--" + t.status.to_s + "--" + t.to_s # raise "Nil Thread State" if t[:thread_state].nil? end done.each{|d| @threads.delete(d)} # Clear dead threads # If no more jobs are left, mark done if done? @done = true else puts "Sleeping for 2" sleep 2 # Wait for more threads to spawn end end end end |
#run ⇒ Object
Runs while there are jobs in the queue Waits for a second and checks for available threads Exits when all jobs are allocated in threads
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/transporter/parallelizer.rb', line 29 def run @done = false @manager_running = false reaper = nil while @queue.size > 0 if @threads.size <= @max_threads @threads << Thread.new(@queue.pop) do |proc| Thread.current[:thread_state] = "running" @klass.new.instance_exec(proc[:params], &proc[:block]) Thread.current[:thread_state] = "done" end else unless @manager_running reaper = manage_threads @manager_running = true end sleep 1 end end # Run manager if thread size never reached max reaper = manage_threads unless @manager_running # Waits until all threads are completed # Before exiting reaper.join end |