Class: Pwrake::LocalityAwareQueue
Instance Attribute Summary
Attributes inherited from TaskQueue
#enable_steal
Instance Method Summary
collapse
Methods inherited from TaskQueue
#_qstr, #deq_noaction_task, #deq_task, #deq_turn, #enq, #enq_body, #initialize
Instance Method Details
#clear ⇒ Object
153
154
155
156
157
|
# File 'lib/pwrake/queue/locality_aware_queue.rb', line 153
def clear
@q_no_action.clear
@q.each{|h,q| q.clear}
@q_remote.clear
end
|
#deq_impl(host_info, turn) ⇒ Object
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
# File 'lib/pwrake/queue/locality_aware_queue.rb', line 59
def deq_impl(host_info, turn)
host = host_info.name
case turn
when 0
if t = @q_no_action.shift
Log.debug "deq_no_action task=#{t&&t.name} host=#{host}"
return t
elsif t = deq_locate(host_info,host_info)
Log.debug "deq_locate task=#{t&&t.name} host=#{host}"
return t
elsif t = @q_remote.shift(host_info)
Log.debug "deq_remote task=#{t&&t.name}"
return t
else
nil
end
when 1
if t = deq_steal(host_info)
Log.debug "deq_steal task=#{t&&t.name} host=#{host}"
return t
else
nil
end
end
end
|
#deq_locate(q_host, run_host) ⇒ Object
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
# File 'lib/pwrake/queue/locality_aware_queue.rb', line 85
def deq_locate(q_host,run_host)
q = @q[q_host.id]
if q && !q.empty?
t = q.shift(run_host)
if t
t.assigned.each do |h|
if q_h = @q[h]
q_h.delete(t)
end
end
@size_q -= 1
end
return t
else
nil
end
end
|
#deq_steal(host_info) ⇒ Object
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/pwrake/queue/locality_aware_queue.rb', line 103
def deq_steal(host_info)
max_host = nil
max_num = 0
@q_group[host_info.id].each do |qg|
qg.each do |h,a|
if !a.empty? d = a.size
if d > max_num
max_host = h
max_num = d
end
end
end
if max_num > 0
max_info = @hostinfo_by_id[max_host]
Log.debug "deq_steal max_host=#{max_info.name} max_num=#{max_num}"
t = host_info.steal_phase{|h| deq_locate(max_info,h)}
return t if t
end
end
nil
end
|
#drop_host(host_info) ⇒ Object
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
|
# File 'lib/pwrake/queue/locality_aware_queue.rb', line 165
def drop_host(host_info)
hid = host_info.id
if q_drop = @q.delete(hid)
n_move = 0
q_size = q_drop.size
while t = q_drop.shift
assigned_other = false
t.assigned.each do |h|
if h != hid && @q[h]
assigned_other = true
break
end
end
if !assigned_other
@size_q -= 1
@q_remote.push(t)
n_move += 1
end
end
Log.debug "LAQ#drop_host: host=#{host_info.name} q.size=#{q_size} n_move=#{n_move}"
end
end
|
#empty? ⇒ Boolean
159
160
161
162
163
|
# File 'lib/pwrake/queue/locality_aware_queue.rb', line 159
def empty?
@size_q == 0 &&
@q_no_action.empty? &&
@q_remote.empty?
end
|
#enq_impl(t) ⇒ Object
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
# File 'lib/pwrake/queue/locality_aware_queue.rb', line 27
def enq_impl(t)
hints = t && t.suggest_location
Log.debug "enq #{t.name} hints=#{hints.inspect}"
if hints.nil? || hints.empty?
@q_remote.push(t)
else
kv = {}
hints.each do |h|
HostMap.ipmatch_for_name(h).each{|id| kv[id] = true}
end
if !kv.empty?
kv.each_key do |id|
t.assigned.push(id)
@q[id].push(t)
end
@size_q += 1
else
@q_remote.push(t)
end
end
@last_enq_time = Time.now
end
|
#init_queue(group_map = nil) ⇒ Object
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# File 'lib/pwrake/queue/locality_aware_queue.rb', line 5
def init_queue(group_map=nil)
@size_q = 0
@q = {}
@hostinfo_by_id.each do |id,h|
@q[id] = @array_class.new(h.ncore)
end
@q_group = {}
group_map ||= {1=>@hostinfo_by_id.map{|id,h| id}}
group_map.each do |gid,ary|
q1 = {} q2 = @q.dup ary.each{|hid| q1[hid] = q2.delete(hid)}
a = [q1,q2]
ary.each{|hid| @q_group[hid] = a}
end
@q_remote = @array_class.new(0)
@disable_steal = Rake.application.pwrake_options['DISABLE_STEAL']
@last_enq_time = Time.now
@n_turn = @disable_steal ? 1 : 2
end
|
#inspect_q ⇒ Object
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
# File 'lib/pwrake/queue/locality_aware_queue.rb', line 128
def inspect_q
s = _qstr("noaction",@q_no_action)
if @size_q == 0
n = @q.size
else
n = 0
@q.each do |h,q|
if q.size > 0
hinfo = @hostinfo_by_id[h]
if hinfo
s << _qstr(hinfo.name,q)
else
s << _qstr("(#{hinfo.inspect})",q)
end
else
n += 1
end
end
end
s << _qstr("local*#{n}",[]) if n > 0
s << _qstr("remote",@q_remote)
s << "@size_q=#{@size_q}"
s
end
|
#turn_empty?(turn) ⇒ Boolean
50
51
52
53
54
55
56
57
|
# File 'lib/pwrake/queue/locality_aware_queue.rb', line 50
def turn_empty?(turn)
case turn
when 0
@q_no_action.empty? && @size_q == 0 && @q_remote.empty?
when 1
@size_q == 0
end
end
|