Class: Pwrake::CommunicatorSet

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/pwrake/branch/communicator_set.rb,
lib/pwrake/mpi/branch.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(master_rd, selector, option) ⇒ CommunicatorSet

Returns a new instance of CommunicatorSet.



9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/pwrake/branch/communicator_set.rb', line 9

def initialize(master_rd,selector,option)
  @master_rd = master_rd
  @selector = selector
  @option = option
  @communicators = {}
  @error_host = []
  @initial_communicators = []
  if hb = @option[:heartbeat]
    @heartbeat_timeout = hb + 30
  end
  init_hosts
end

Instance Attribute Details

#ipaddr_to_rankObject (readonly)

Returns the value of attribute ipaddr_to_rank.



45
46
47
# File 'lib/pwrake/mpi/branch.rb', line 45

def ipaddr_to_rank
  @ipaddr_to_rank
end

#rank_to_ipaddrObject (readonly)

Returns the value of attribute rank_to_ipaddr.



46
47
48
# File 'lib/pwrake/mpi/branch.rb', line 46

def rank_to_ipaddr
  @rank_to_ipaddr
end

#selectorObject (readonly)

Returns the value of attribute selector.



26
27
28
# File 'lib/pwrake/branch/communicator_set.rb', line 26

def selector
  @selector
end

Instance Method Details

#add(comm) ⇒ Object



53
54
55
# File 'lib/pwrake/branch/communicator_set.rb', line 53

def add(comm)
  @communicators[comm.id] = comm
end

#create_communicatorsObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/pwrake/branch/communicator_set.rb', line 30

def create_communicators
  Fiber.new do
    s = @master_rd.get_line
    if s.chomp != "host_list_begin"
      raise "Branch#setup_worker: recv=#{s.chomp} expected=host_list_begin"
    end

    while s = @master_rd.get_line
      s.chomp!
      break if s == "host_list_end"
      if /^host:(\d+) (\S+) ([+-]?\d+)?$/ =~ s
        id, host, ncore = $1,$2,$3
        ncore &&= ncore.to_i
        @communicators[id] = Communicator.new(self,id,host,ncore,@selector,@option)
      else
        raise "Branch#setup_worker: recv=#{s.chomp} expected=host:id hostname ncore"
      end
    end
  end.resume
  @selector.run(@heartbeat_timeout)
  @initial_communicators = @communicators.dup
end

#delete(comm) ⇒ Object



57
58
59
60
# File 'lib/pwrake/branch/communicator_set.rb', line 57

def delete(comm)
  @communicators.delete(comm.id)
  @error_host << comm.host
end

#drop(id) ⇒ Object



62
63
64
65
66
# File 'lib/pwrake/branch/communicator_set.rb', line 62

def drop(id)
  comm = @communicators[id]
  Log.debug "drop:id=#{id} comm=#{comm.inspect} @communicators.keys=#{@communicators.keys}"
  comm.dropout if comm
end

#drop_allObject



68
69
70
71
72
73
# File 'lib/pwrake/branch/communicator_set.rb', line 68

def drop_all
  Log.debug "drop_all"
  @communicators.keys.each do |id|
    @communicators[id].dropout
  end
end

#exitObject



103
104
105
106
107
# File 'lib/pwrake/branch/communicator_set.rb', line 103

def exit
  @selector.clear
  NBIO::Handler.exit(handler_set)
  @selector.run
end

#finish_shellsObject



75
76
77
78
79
80
# File 'lib/pwrake/branch/communicator_set.rb', line 75

def finish_shells
  Log.debug "finish_shells"
  @communicators.keys.each do |id|
    @communicators[id].finish_shells
  end
end

#handler_setObject



93
94
95
# File 'lib/pwrake/branch/communicator_set.rb', line 93

def handler_set
  @communicators.each_value.map{|comm| comm.handler}.compact
end

#init_hostsObject



22
23
24
# File 'lib/pwrake/branch/communicator_set.rb', line 22

def init_hosts
  # for pwrake-mpi
end

#kill(sig) ⇒ Object



97
98
99
100
101
# File 'lib/pwrake/branch/communicator_set.rb', line 97

def kill(sig)
  @selector.clear
  NBIO::Handler.kill(handler_set,sig)
  @selector.run
end

#run(message) ⇒ Object



82
83
84
85
86
87
88
89
90
91
# File 'lib/pwrake/branch/communicator_set.rb', line 82

def run(message)
  @error_host = []
  n1 = @communicators.size
  @selector.run(@heartbeat_timeout)
  n2 = @communicators.size
  if n1 != n2
    Log.info "# of communicators: #{n1}->#{n2} during #{message.inspect}"
    Log.info "retired hosts=[#{@error_host.join(',')}]"
  end
end