Class: TaskMaster
- Inherits:
-
Object
- Object
- TaskMaster
- Defined in:
- ext/lib/CompLearnLib/TaskMasterMPI.rb,
ext/lib/CompLearnLib/TaskMasterSingle.rb
Constant Summary collapse
- @@haveInitted =
false
Class Method Summary collapse
- .doMasterInit ⇒ Object
- .doSlaveLoop ⇒ Object
- .enqueue(t, &cb) ⇒ Object
- .fetch(key) ⇒ Object
- .getFreeSlave ⇒ Object
- .handleReply(reply, status) ⇒ Object
- .init ⇒ Object
- .reply(t, reply) ⇒ Object
- .storeEverywhere(key, val) ⇒ Object
- .waitForReply ⇒ Object
- .waitForSlave ⇒ Object
Class Method Details
.doMasterInit ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 27 def TaskMaster.doMasterInit() return if @@haveInitted @@haveInitted = true sz = MPI::Comm::WORLD.size() puts "Starting calculation with #{sz} nodes" Kernel.at_exit() { exitmsg = ExitMsg.new @slaves.each { |n| MPI::Comm::WORLD.send(exitmsg, n, 0) } } sz.times { |i| @free << i unless i == 0 } @slaves = @free.clone end |
.doSlaveLoop ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 42 def TaskMaster.doSlaveLoop() Process.setpriority(Process::PRIO_PROCESS, Process.pid, 19) while true task,status = MPI::Comm::WORLD.recv(0, 0) if task.is_a?(ExitMsg) exit(0) end if task.is_a?(Task) task.execute() end if task.is_a?(Store) @storage[task.key] = task.val end end end |
.enqueue(t, &cb) ⇒ Object
101 102 103 104 105 106 107 108 109 |
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 101 def TaskMaster.enqueue(t, &cb) @maxTid += 1 t.setTid(@maxTid) @cbs[t.tid] = cb @tasks[t.tid] = t nextFreeSlave = TaskMaster.getFreeSlave m = Marshal.dump(t) MPI::Comm::WORLD.send(t, nextFreeSlave, 0) end |
.fetch(key) ⇒ Object
23 24 25 |
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 23 def TaskMaster.fetch(key) @storage[key] end |
.getFreeSlave ⇒ Object
66 67 68 69 70 |
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 66 def TaskMaster.getFreeSlave() waitForSlave() if @free.size == 0 nextSlave = @free.shift nextSlave end |
.handleReply(reply, status) ⇒ Object
80 81 82 83 84 85 86 87 88 |
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 80 def TaskMaster.handleReply(reply, status) tid,obj = reply if @cbs[tid] @cbs[tid].call(obj, @tasks[tid], status.source) @cbs[tid] = nil end @tasks[tid] = nil @free << status.source end |
.init ⇒ Object
58 59 60 61 62 63 64 |
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 58 def TaskMaster.init() if (MPI::Comm::WORLD.rank() == 0) TaskMaster.doMasterInit() else TaskMaster.doSlaveLoop() end end |
.reply(t, reply) ⇒ Object
76 77 78 |
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 76 def TaskMaster.reply(task, result) MPI::Comm::WORLD.send([task.tid, result], 0, 0) end |
.storeEverywhere(key, val) ⇒ Object
95 96 97 98 99 |
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 95 def TaskMaster.storeEverywhere(key, val) @slaves.each { |n| MPI::Comm::WORLD.send(Store.new(key,val), n, 0) } end |
.waitForReply ⇒ Object
72 73 74 |
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 72 def TaskMaster.waitForReply() TaskMaster.waitForSlave() end |
.waitForSlave ⇒ Object
90 91 92 93 |
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 90 def TaskMaster.waitForSlave() reply, status = MPI::Comm::WORLD.recv(MPI::Comm::ANY_SOURCE, 0) TaskMaster.handleReply(reply,status) end |