Class: Myreplicator::Parallelizer

Inherits:
Object
  • Object
show all
Defined in:
lib/transporter/parallelizer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Parallelizer

Returns a new instance of Parallelizer.



16
17
18
19
20
21
22
# File 'lib/transporter/parallelizer.rb', line 16

def initialize *args
  options = args.extract_options!
  @queue = Queue.new
  @threads = []
  @max_threads = options[:max_threads].nil? ? 10 : options[:max_threads]     
  @klass = options[:klass].constantize
end

Instance Attribute Details

#queueObject

Returns the value of attribute queue.



14
15
16
# File 'lib/transporter/parallelizer.rb', line 14

def queue
  @queue
end

Instance Method Details

#done?Boolean

Returns true when all jobs are processed and no thread is running

Returns:

  • (Boolean)


91
92
93
94
95
96
# File 'lib/transporter/parallelizer.rb', line 91

def done?
  if @queue.size == 0 && @threads.size == 0
    return true
  end
  return false
end

#manage_threadsObject

Clears dead threads, frees thread pool for more jobs Exits when no more threads are left



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/transporter/parallelizer.rb', line 63

def manage_threads
  Thread.new do 
    while(@threads.size > 0)
      done = []
      @threads.each do |t|
        done << t if t[:thread_state] == "done" || !t.status
        # puts t.object_id.to_s + "--" + t.status.to_s + "--" + t.to_s
        # raise "Nil Thread State" if t[:thread_state].nil?
      end
      done.each{|d| @threads.delete(d)} # Clear dead threads
      
      # If no more jobs are left, mark done

      if done?
        @done = true
      else
        puts "Sleeping for 2"
        sleep 2 # Wait for more threads to spawn
      end

    end
  end
end

#runObject

Runs while there are jobs in the queue Waits for a second and checks for available threads Exits when all jobs are allocated in threads



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/transporter/parallelizer.rb', line 29

def run
  @done = false
  @manager_running = false
  reaper = nil

  while @queue.size > 0
    if @threads.size <= @max_threads
      @threads << Thread.new(@queue.pop) do |proc|
        Thread.current[:thread_state] = "running"
        @klass.new.instance_exec(proc[:params], &proc[:block])
        Thread.current[:thread_state] = "done"
      end       
    else
      unless @manager_running
        reaper = manage_threads 
        @manager_running = true
      end
      sleep 1
    end
  end   
  
  # Run manager if thread size never reached max
  reaper = manage_threads unless @manager_running

  # Waits until all threads are completed
  # Before exiting
  reaper.join
end