Class: Pwrake::TaskQueue
- Inherits:
-
Object
- Object
- Pwrake::TaskQueue
- Defined in:
- lib/pwrake/queue/task_queue.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#enable_steal ⇒ Object
Returns the value of attribute enable_steal.
Instance Method Summary collapse
- #_qstr(h, q) ⇒ Object
- #clear ⇒ Object
- #deq_impl(host_info = nil, turn = nil) ⇒ Object
- #deq_noaction_task(&block) ⇒ Object
-
#deq_task(&block) ⇒ Object
locality version.
- #deq_turn(turn, &block) ⇒ Object
- #drop_host(host_info) ⇒ Object
- #empty? ⇒ Boolean
-
#enq(tw) ⇒ Object
enq.
- #enq_body(tw) ⇒ Object
- #enq_impl(tw) ⇒ Object
- #init_queue(group_map = nil) ⇒ Object
-
#initialize(hostinfo_by_id, group_map = nil) ⇒ TaskQueue
constructor
A new instance of TaskQueue.
- #inspect_q ⇒ Object
- #turn_empty?(turn) ⇒ Boolean
Constructor Details
#initialize(hostinfo_by_id, group_map = nil) ⇒ TaskQueue
Returns a new instance of TaskQueue.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/pwrake/queue/task_queue.rb', line 8 def initialize(hostinfo_by_id, group_map=nil) @enable_steal = true @q_no_action = NoActionQueue.new @hostinfo_by_id = hostinfo_by_id pri = Rake.application.['QUEUE_PRIORITY'] || "LIHR" case pri when /prio/i @array_class = PriorityQueueArray when /fifo/i @array_class = FifoQueueArray # Array # Fifo when /lifo/i @array_class = LifoQueueArray when /lihr/i @array_class = LifoHrfQueueArray when /prhr/i @array_class = PriorityHrfQueueArray when /rank/i @array_class = RankQueueArray else raise RuntimeError,"unknown option for QUEUE_PRIORITY: "+pri end Log.debug "@array_class=#{@array_class.inspect}" init_queue(group_map) end |
Instance Attribute Details
#enable_steal ⇒ Object
Returns the value of attribute enable_steal.
41 42 43 |
# File 'lib/pwrake/queue/task_queue.rb', line 41 def enable_steal @enable_steal end |
Instance Method Details
#_qstr(h, q) ⇒ Object
133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/pwrake/queue/task_queue.rb', line 133 def _qstr(h,q) s = " #{h}: size=#{q.size} " case q.size when 0 s << "[]\n" when 1 s << "[#{q.first.name}]\n" when 2 s << "[#{q.first.name}, #{q.last.name}]\n" else s << "[#{q.first.name},..,#{q.last.name}]\n" end s end |
#clear ⇒ Object
121 122 123 124 125 |
# File 'lib/pwrake/queue/task_queue.rb', line 121 def clear @q_no_action.clear @q_input.clear @q_no_input.clear end |
#deq_impl(host_info = nil, turn = nil) ⇒ Object
115 116 117 118 119 |
# File 'lib/pwrake/queue/task_queue.rb', line 115 def deq_impl(host_info=nil, turn=nil) @q_no_action.shift || @q_input.shift(host_info) || @q_no_input.shift(host_info) end |
#deq_noaction_task(&block) ⇒ Object
64 65 66 67 68 69 70 |
# File 'lib/pwrake/queue/task_queue.rb', line 64 def deq_noaction_task(&block) Log.debug "deq_noaction_task:"+(empty? ? " (empty)" : "\n#{inspect_q}") while tw = @q_no_action.shift Log.debug "deq_noaction: #{tw.name}" yield(tw) end end |
#deq_task(&block) ⇒ Object
locality version
72 73 74 75 76 77 78 79 |
# File 'lib/pwrake/queue/task_queue.rb', line 72 def deq_task(&block) # locality version Log.debug "deq_task from:"+(empty? ? " (empty)" : "\n#{inspect_q}") queued = 0 @n_turn.times do |turn| next if turn_empty?(turn) queued += deq_turn(turn,&block) end end |
#deq_turn(turn, &block) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/pwrake/queue/task_queue.rb', line 81 def deq_turn(turn,&block) queued = 0 while true count = 0 @hostinfo_by_id.each_value do |host_info| #Log.debug "TaskQueue#deq_turn host_info=#{host_info.name}" if (n = host_info.idle_cores) && n > 0 if turn_empty?(turn) return queued elsif tw = deq_impl(host_info,turn) n_task_cores = tw.n_used_cores(host_info) Log.debug "deq: #{tw.name} n_task_cores=#{n_task_cores}" if host_info.idle_cores < n_task_cores m = "task.n_used_cores=#{n_task_cores} must be "+ "<= host_info.idle_cores=#{host_info.idle_cores}" Log.fatal m raise RuntimeError,m else yield(tw,host_info,n_task_cores) count += 1 queued += 1 end end end end break if count == 0 end queued end |
#drop_host(host_info) ⇒ Object
154 155 |
# File 'lib/pwrake/queue/task_queue.rb', line 154 def drop_host(host_info) end |
#empty? ⇒ Boolean
127 128 129 130 131 |
# File 'lib/pwrake/queue/task_queue.rb', line 127 def empty? @q_no_action.empty? && @q_input.empty? && @q_no_input.empty? end |
#enq(tw) ⇒ Object
enq
44 45 46 47 48 49 50 |
# File 'lib/pwrake/queue/task_queue.rb', line 44 def enq(tw) if tw.nil? || tw.actions.empty? @q_no_action.push(tw) else enq_body(tw) end end |
#enq_body(tw) ⇒ Object
52 53 54 |
# File 'lib/pwrake/queue/task_queue.rb', line 52 def enq_body(tw) enq_impl(tw) end |
#enq_impl(tw) ⇒ Object
56 57 58 59 60 61 62 |
# File 'lib/pwrake/queue/task_queue.rb', line 56 def enq_impl(tw) if tw.has_input_file? @q_input.push(tw) else @q_no_input.push(tw) end end |
#init_queue(group_map = nil) ⇒ Object
35 36 37 38 39 |
# File 'lib/pwrake/queue/task_queue.rb', line 35 def init_queue(group_map=nil) @q_input = @array_class.new(0) @q_no_input = FifoQueueArray.new @n_turn = 1 end |
#inspect_q ⇒ Object
148 149 150 151 152 |
# File 'lib/pwrake/queue/task_queue.rb', line 148 def inspect_q _qstr("noaction",@q_no_action) + _qstr("input", @q_input) + _qstr("no_input",@q_no_input) end |
#turn_empty?(turn) ⇒ Boolean
111 112 113 |
# File 'lib/pwrake/queue/task_queue.rb', line 111 def turn_empty?(turn) empty? end |