Class: Pwrake::CommunicatorSet
- Inherits:
-
Object
- Object
- Pwrake::CommunicatorSet
- Extended by:
- Forwardable
- Defined in:
- lib/pwrake/branch/communicator_set.rb,
lib/pwrake/mpi/branch.rb
Instance Attribute Summary collapse
-
#ipaddr_to_rank ⇒ Object
readonly
Returns the value of attribute ipaddr_to_rank.
-
#rank_to_ipaddr ⇒ Object
readonly
Returns the value of attribute rank_to_ipaddr.
-
#selector ⇒ Object
readonly
Returns the value of attribute selector.
Instance Method Summary collapse
- #add(comm) ⇒ Object
- #create_communicators ⇒ Object
- #delete(comm) ⇒ Object
- #drop(id) ⇒ Object
- #drop_all ⇒ Object
- #exit ⇒ Object
- #finish_shells ⇒ Object
- #handler_set ⇒ Object
- #init_hosts ⇒ Object
-
#initialize(master_rd, selector, option) ⇒ CommunicatorSet
constructor
A new instance of CommunicatorSet.
- #kill(sig) ⇒ Object
- #run(message) ⇒ Object
Constructor Details
#initialize(master_rd, selector, option) ⇒ CommunicatorSet
Returns a new instance of CommunicatorSet.
9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/pwrake/branch/communicator_set.rb', line 9 def initialize(master_rd,selector,option) @master_rd = master_rd @selector = selector @option = option @communicators = {} @error_host = [] @initial_communicators = [] if hb = @option[:heartbeat] @heartbeat_timeout = hb + 30 end init_hosts end |
Instance Attribute Details
#ipaddr_to_rank ⇒ Object (readonly)
Returns the value of attribute ipaddr_to_rank.
45 46 47 |
# File 'lib/pwrake/mpi/branch.rb', line 45 def ipaddr_to_rank @ipaddr_to_rank end |
#rank_to_ipaddr ⇒ Object (readonly)
Returns the value of attribute rank_to_ipaddr.
46 47 48 |
# File 'lib/pwrake/mpi/branch.rb', line 46 def rank_to_ipaddr @rank_to_ipaddr end |
#selector ⇒ Object (readonly)
Returns the value of attribute selector.
26 27 28 |
# File 'lib/pwrake/branch/communicator_set.rb', line 26 def selector @selector end |
Instance Method Details
#add(comm) ⇒ Object
53 54 55 |
# File 'lib/pwrake/branch/communicator_set.rb', line 53 def add(comm) @communicators[comm.id] = comm end |
#create_communicators ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/pwrake/branch/communicator_set.rb', line 30 def create_communicators Fiber.new do s = @master_rd.get_line if s.chomp != "host_list_begin" raise "Branch#setup_worker: recv=#{s.chomp} expected=host_list_begin" end while s = @master_rd.get_line s.chomp! break if s == "host_list_end" if /^host:(\d+) (\S+) ([+-]?\d+)?$/ =~ s id, host, ncore = $1,$2,$3 ncore &&= ncore.to_i @communicators[id] = Communicator.new(self,id,host,ncore,@selector,@option) else raise "Branch#setup_worker: recv=#{s.chomp} expected=host:id hostname ncore" end end end.resume @selector.run(@heartbeat_timeout) @initial_communicators = @communicators.dup end |
#delete(comm) ⇒ Object
57 58 59 60 |
# File 'lib/pwrake/branch/communicator_set.rb', line 57 def delete(comm) @communicators.delete(comm.id) @error_host << comm.host end |
#drop(id) ⇒ Object
62 63 64 65 66 |
# File 'lib/pwrake/branch/communicator_set.rb', line 62 def drop(id) comm = @communicators[id] Log.debug "drop:id=#{id} comm=#{comm.inspect} @communicators.keys=#{@communicators.keys}" comm.dropout if comm end |
#drop_all ⇒ Object
68 69 70 71 72 73 |
# File 'lib/pwrake/branch/communicator_set.rb', line 68 def drop_all Log.debug "drop_all" @communicators.keys.each do |id| @communicators[id].dropout end end |
#exit ⇒ Object
103 104 105 106 107 |
# File 'lib/pwrake/branch/communicator_set.rb', line 103 def exit @selector.clear NBIO::Handler.exit(handler_set) @selector.run end |
#finish_shells ⇒ Object
75 76 77 78 79 80 |
# File 'lib/pwrake/branch/communicator_set.rb', line 75 def finish_shells Log.debug "finish_shells" @communicators.keys.each do |id| @communicators[id].finish_shells end end |
#handler_set ⇒ Object
93 94 95 |
# File 'lib/pwrake/branch/communicator_set.rb', line 93 def handler_set @communicators.each_value.map{|comm| comm.handler}.compact end |
#init_hosts ⇒ Object
22 23 24 |
# File 'lib/pwrake/branch/communicator_set.rb', line 22 def init_hosts # for pwrake-mpi end |
#kill(sig) ⇒ Object
97 98 99 100 101 |
# File 'lib/pwrake/branch/communicator_set.rb', line 97 def kill(sig) @selector.clear NBIO::Handler.kill(handler_set,sig) @selector.run end |
#run(message) ⇒ Object
82 83 84 85 86 87 88 89 90 91 |
# File 'lib/pwrake/branch/communicator_set.rb', line 82 def run() @error_host = [] n1 = @communicators.size @selector.run(@heartbeat_timeout) n2 = @communicators.size if n1 != n2 Log.info "# of communicators: #{n1}->#{n2} during #{.inspect}" Log.info "retired hosts=[#{@error_host.join(',')}]" end end |