Class: Pwrake::TaskQueue
- Inherits:
-
Object
- Object
- Pwrake::TaskQueue
- Defined in:
- lib/pwrake/queue/task_queue.rb
Class Method Summary collapse
Instance Method Summary collapse
- #clear ⇒ Object
- #deq_noaction_task(&block) ⇒ Object
- #deq_reserve(&block) ⇒ Object
- #deq_task(&block) ⇒ Object
- #deq_turn(turn, &block) ⇒ Object
- #drop_host(host_info) ⇒ Object
- #empty? ⇒ Boolean
- #enq(tw) ⇒ Object
-
#initialize(queue_class, hostinfo_by_id, group_map = nil) ⇒ TaskQueue
constructor
A new instance of TaskQueue.
- #inspect_q ⇒ Object
Constructor Details
#initialize(queue_class, hostinfo_by_id, group_map = nil) ⇒ TaskQueue
Returns a new instance of TaskQueue.
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/pwrake/queue/task_queue.rb', line 9 def initialize(queue_class, hostinfo_by_id, group_map=nil) @queue_class = Pwrake.const_get(queue_class) @hostinfo_by_id = hostinfo_by_id @lock = Monitor.new @q_no_action = NoActionQueue.new @q_reserved = Hash.new @nenq = 0 @ndeq = 0 def @q_reserved.first super.last end def @q_reserved.last self[keys.last] end pri = Rake.application.['QUEUE_PRIORITY'] || "LIFO" case pri when /^fifo$/i @array_class = FifoQueueArray when /^lifo$/i @array_class = LifoQueueArray when /^lihr$/i @array_class = LifoHrfQueueArray else raise RuntimeError,"unknown option for QUEUE_PRIORITY: "+pri end Log.debug "@array_class=#{@array_class.inspect}" # median number of cores a = @hostinfo_by_id.map{|id,host_info| host_info.ncore}.sort n = a.size @median_core = (n%2==0) ? (a[n/2-1]+a[n/2])/2 : a[(n-1)/2] @q = @queue_class.new(hostinfo_by_id,@array_class,@median_core,group_map) end |
Class Method Details
._qstr(h, q) ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/pwrake/queue/task_queue.rb', line 130 def self._qstr(h,q) s = " #{h}: size=#{q.size} " case q.size when 0 s << "[]\n" when 1 s << "[#{q.first.name}]\n" when 2 s << "[#{q.first.name}, #{q.last.name}]\n" else s << "[#{q.first.name}, .., #{q.last.name}]\n" end s end |
.first ⇒ Object
17 18 19 |
# File 'lib/pwrake/queue/task_queue.rb', line 17 def @q_reserved.first super.last end |
.last ⇒ Object
20 21 22 |
# File 'lib/pwrake/queue/task_queue.rb', line 20 def @q_reserved.last self[keys.last] end |
Instance Method Details
#clear ⇒ Object
118 119 120 121 122 |
# File 'lib/pwrake/queue/task_queue.rb', line 118 def clear @q_no_action.clear @q_reserved.clear @q.clear end |
#deq_noaction_task(&block) ⇒ Object
75 76 77 78 79 80 |
# File 'lib/pwrake/queue/task_queue.rb', line 75 def deq_noaction_task(&block) while tw = @q_no_action.shift yield(tw) @ndeq += 1 end end |
#deq_reserve(&block) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/pwrake/queue/task_queue.rb', line 82 def deq_reserve(&block) @q_reserved.each do |host_info,tw| n_idle = host_info.idle_cores || 0 n_core = tw.use_cores(host_info) if n_idle >= n_core @q_reserved.delete(host_info) Log.debug "deq_reserve: #{tw.name} n_use_cores=#{n_core}" yield(tw,host_info,n_core) @ndeq += 1 end end end |
#deq_task(&block) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/pwrake/queue/task_queue.rb', line 56 def deq_task(&block) @lock.synchronize do if @nenq > 0 Log.debug "deq_task nenq=#{@nenq}:"+(empty? ? " (empty)" : "\n"+inspect_q) @nenq = 0 end deq_noaction_task(&block) deq_reserve(&block) @q.deq_start unless @q.empty? @q.turns.each{|turn| deq_turn(turn,&block) } end if @ndeq > 0 Log.debug "deq_task ndeq=#{@ndeq}:"+(empty? ? " (empty)" : "\n"+inspect_q) @ndeq = 0 end end end |
#deq_turn(turn, &block) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/pwrake/queue/task_queue.rb', line 95 def deq_turn(turn,&block) begin count = 0 @hostinfo_by_id.each_value do |host_info| return if @q.turn_empty?(turn) n_idle = host_info.idle_cores || 0 next if n_idle == 0 || @q_reserved[host_info] if tw = @q.deq_impl(host_info,turn) n_core = tw.use_cores(host_info) if n_idle >= n_core Log.debug "deq: #{tw.name} n_use_cores=#{n_core}" yield(tw,host_info,n_core) count += 1 @ndeq += 1 else @q_reserved[host_info] = tw Log.debug "reserve host: #{host_info.name} for #{tw.name} (#{n_core} cores)" end end end end while count > 0 end |
#drop_host(host_info) ⇒ Object
151 152 153 |
# File 'lib/pwrake/queue/task_queue.rb', line 151 def drop_host(host_info) @q.drop_host(host_info) end |
#empty? ⇒ Boolean
124 125 126 127 128 |
# File 'lib/pwrake/queue/task_queue.rb', line 124 def empty? @q_no_action.empty? && @q_reserved.empty? && @q.empty? end |
#enq(tw) ⇒ Object
45 46 47 48 49 50 51 52 53 54 |
# File 'lib/pwrake/queue/task_queue.rb', line 45 def enq(tw) @lock.synchronize do if tw.nil? || tw.actions.empty? @q_no_action.push(tw) else @q.enq_impl(tw) end @nenq += 1 end end |