Class: TaskMaster

Inherits:
Object
  • Object
show all
Defined in:
ext/lib/CompLearnLib/TaskMasterMPI.rb,
ext/lib/CompLearnLib/TaskMasterSingle.rb

Constant Summary collapse

@@haveInitted =
false

Class Method Summary collapse

Class Method Details

.doMasterInitObject



27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 27

def TaskMaster.doMasterInit()
  return if @@haveInitted
  @@haveInitted = true
  sz = MPI::Comm::WORLD.size()
  puts "Starting calculation with #{sz} nodes"
  Kernel.at_exit() {
    exitmsg = ExitMsg.new
    @slaves.each { |n|
      MPI::Comm::WORLD.send(exitmsg, n, 0)
    }
  }
  sz.times { |i| @free << i unless i == 0 }
  @slaves = @free.clone
end

.doSlaveLoopObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 42

def TaskMaster.doSlaveLoop()
  Process.setpriority(Process::PRIO_PROCESS, Process.pid, 19)
  while true
    task,status = MPI::Comm::WORLD.recv(0, 0)
    if task.is_a?(ExitMsg)
      exit(0)
    end
    if task.is_a?(Task)
      task.execute()
    end
    if task.is_a?(Store)
      @storage[task.key] = task.val
    end
  end
end

.enqueue(t, &cb) ⇒ Object



101
102
103
104
105
106
107
108
109
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 101

def TaskMaster.enqueue(t, &cb)
    @maxTid += 1
    t.setTid(@maxTid)
    @cbs[t.tid] = cb
    @tasks[t.tid] = t
    nextFreeSlave = TaskMaster.getFreeSlave
    m = Marshal.dump(t)
    MPI::Comm::WORLD.send(t, nextFreeSlave, 0)
end

.fetch(key) ⇒ Object



23
24
25
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 23

def TaskMaster.fetch(key)
  @storage[key]
end

.getFreeSlaveObject



66
67
68
69
70
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 66

def TaskMaster.getFreeSlave()
  waitForSlave() if @free.size == 0
  nextSlave = @free.shift
  nextSlave
end

.handleReply(reply, status) ⇒ Object



80
81
82
83
84
85
86
87
88
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 80

def TaskMaster.handleReply(reply, status)
  tid,obj = reply
  if @cbs[tid]
    @cbs[tid].call(obj, @tasks[tid], status.source) 
    @cbs[tid] = nil
  end
  @tasks[tid] = nil
  @free << status.source
end

.initObject



58
59
60
61
62
63
64
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 58

def TaskMaster.init()
    if (MPI::Comm::WORLD.rank() == 0)
      TaskMaster.doMasterInit()
    else
      TaskMaster.doSlaveLoop()
    end
end

.reply(t, reply) ⇒ Object



76
77
78
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 76

def TaskMaster.reply(task, result)
  MPI::Comm::WORLD.send([task.tid, result], 0, 0)
end

.storeEverywhere(key, val) ⇒ Object



95
96
97
98
99
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 95

def TaskMaster.storeEverywhere(key, val)
  @slaves.each { |n|
    MPI::Comm::WORLD.send(Store.new(key,val), n, 0)
  }
end

.waitForReplyObject



72
73
74
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 72

def TaskMaster.waitForReply()
  TaskMaster.waitForSlave()
end

.waitForSlaveObject



90
91
92
93
# File 'ext/lib/CompLearnLib/TaskMasterMPI.rb', line 90

def TaskMaster.waitForSlave()
    reply, status = MPI::Comm::WORLD.recv(MPI::Comm::ANY_SOURCE, 0)
    TaskMaster.handleReply(reply,status)
end