Class: Wakame::MasterManagers::LockQueue
- Inherits:
-
Object
- Object
- Wakame::MasterManagers::LockQueue
- Defined in:
- lib/wakame/master_managers/action_manager.rb
Instance Method Summary collapse
- #clear_resource(resource) ⇒ Object
-
#initialize ⇒ LockQueue
constructor
A new instance of LockQueue.
- #inspect ⇒ Object
- #quit(id) ⇒ Object
- #reset ⇒ Object
- #set(resource, id) ⇒ Object
- #test(id) ⇒ Object
- #wait(id, tout = 60*30) ⇒ Object
Constructor Details
#initialize ⇒ LockQueue
Returns a new instance of LockQueue.
215 216 217 218 219 220 221 222 223 |
# File 'lib/wakame/master_managers/action_manager.rb', line 215 def initialize() @locks = {} @id2res = {} @self_m = ::Mutex.new @queue_by_thread = {} @qbt_m = ::Mutex.new end |
Instance Method Details
#clear_resource(resource) ⇒ Object
290 291 |
# File 'lib/wakame/master_managers/action_manager.rb', line 290 def clear_resource(resource) end |
#inspect ⇒ Object
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 |
# File 'lib/wakame/master_managers/action_manager.rb', line 293 def inspect output = @locks.collect { |k, lst| [k, lst].flatten } return "" if output.empty? # Table display maxcolws = (0..(output.size)).zip(*output).collect { |i| i.shift; i.map!{|i| (i.nil? ? "" : i).length }.max } maxcol = maxcolws.size maxcolws.reverse.each { |i| break if i > 0 maxcol -= 1 } textrows = output.collect { |x| buf="" maxcol.times { |n| buf << "|" + (x[n] || "").ljust(maxcolws[n]) } buf << "|" } "+" + (["-"] * (textrows[0].length - 2)).join('') + "+\n" + \ textrows.join("\n") + \ "\n+" + (["-"] * (textrows[0].length - 2)).join('')+ "+" end |
#quit(id) ⇒ Object
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/wakame/master_managers/action_manager.rb', line 274 def quit(id) case test(id) when :runnable, :wait @self_m.synchronize { @id2res[id].keys.each { |r| @locks[r.to_s].delete_if{ |i| i == id } } @locks.delete_if{ |k,v| v.nil? || v.empty? } } @qbt_m.synchronize { @queue_by_thread.each {|t, q| q.enq(id) } } end @id2res.delete(id) Wakame.log.debug("#{self.class}: quit(#{id})" + "\n#{self.inspect}") end |
#reset ⇒ Object
239 240 241 242 243 244 245 246 |
# File 'lib/wakame/master_managers/action_manager.rb', line 239 def reset() @self_m.synchronize { @locks.keys { |k| @locks[k].clear } @id2res.clear } end |
#set(resource, id) ⇒ Object
225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/wakame/master_managers/action_manager.rb', line 225 def set(resource, id) @self_m.synchronize { # Ths Job ID already holds/reserves the lock regarding the resource. return if @id2res.has_key?(id) && @id2res[id].has_key?(resource.to_s) @locks[resource.to_s] ||= [] @id2res[id] ||= {} @id2res[id][resource.to_s]=1 @locks[resource.to_s] << id } Wakame.log.debug("#{self.class}: set(#{resource.to_s}, #{id})" + "\n#{self.inspect}") end |
#test(id) ⇒ Object
248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/wakame/master_managers/action_manager.rb', line 248 def test(id) @self_m.synchronize { reslist = @id2res[id] return :pass if reslist.nil? || reslist.empty? if reslist.keys.all? { |r| id == @locks[r.to_s][0] } return :runnable else return :wait end } end |
#wait(id, tout = 60*30) ⇒ Object
261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/wakame/master_managers/action_manager.rb', line 261 def wait(id, tout=60*30) @qbt_m.synchronize { @queue_by_thread[Thread.current] = ::Queue.new } timeout(tout) { while test(id) == :wait Wakame.log.debug("#{self.class}: Job #{id} waits for locked resouces: #{@id2res[id].keys.join(', ')}") break if id == @queue_by_thread[Thread.current].deq end } ensure @qbt_m.synchronize { @queue_by_thread.delete(Thread.current) } end |