Class: Pwrake::Invoker

Inherits:
Object
  • Object
show all
Defined in:
lib/pwrake/mpi/worker.rb,
lib/pwrake/worker/invoker.rb

Instance Method Summary collapse

Constructor Details

#initializeInvoker

Returns a new instance of Invoker.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/pwrake/worker/invoker.rb', line 33

def initialize
  setup_connection
  @dir_class = Pwrake.const_get(@option[:shared_directory])
  @dir_class.init(@option)
  @ex_list = {}
  @log = LogExecutor.instance
  @log.init(@option)
  @log.open(@dir_class)
  @out.add_logger(@log)
  send_ipaddr
  send_ncore
  # does NOT exit when writing to broken pipe
  Signal.trap("PIPE", "SIG_IGN")
end

Instance Method Details

#close_allObject



185
186
187
188
189
190
191
192
193
194
195
# File 'lib/pwrake/worker/invoker.rb', line 185

def close_all
  @log.info "close_all"
  @heartbeat_thread.kill if @heartbeat_thread
  Dir.chdir
  @ex_list.each_value{|ex| ex.close}
  @ex_list.each_value{|ex| ex.join}
  @log.info "worker:end:#{@ex_list.keys.inspect}"
  Timeout.timeout(20){@log.close}
ensure
  @out.puts "exited"
end

#command_callbackObject



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/pwrake/worker/invoker.rb', line 137

def command_callback
  while line = get_line(@rd)
    case line
    when /^(\d+):exit$/o
      id = $1
      ex = @ex_list.delete(id)
      ex.close
      ex.join
    when /^(\d+):(.*)$/o
      id,cmd = $1,$2
      @ex_list[id].execute(cmd.chomp)
    else
      break if common_line(line)
    end
  end
  if @rd.eof?
    # connection lost
    raise RuntimeError,"lost connection to master"
  end
end

#common_line(line) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/pwrake/worker/invoker.rb', line 158

def common_line(line)
  case line
  when /^exit$/o
    return true
    #
  when /^kill:(.*)$/o
    sig = $1
    sig = sig.to_i if /^\d+$/o =~ sig
    kill_all(sig)
    return false
    #
  when /^p$/o
    $stderr.puts "@ex_list = #{@ex_list.inspect}"
    return false
    #
  else
    msg = "invalid line: #{line.inspect}"
    @log.fatal msg
    raise RuntimeError,msg
  end
end

#get_ioObject



20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/pwrake/mpi/worker.rb', line 20

def get_io
  # get IP addresses
  v = Socket.getifaddrs
  v = v.select{|a| a.addr.ip? && (a.flags & Socket::IFF_MULTICAST != 0)}
  v = v.map{|a| a.addr.ip_address}
  s = v.join('|')
  # write IP addresses
  iow = MPipe.new(0)
  iow.write([s.size].pack("V"))
  iow.write(s)
  iow.flush
  # returns IO, $stdin, $stdout
  [MPipe, MPipe.new(0), MPipe.new(0)]
end

#get_line(io) ⇒ Object



78
79
80
81
82
83
84
85
86
# File 'lib/pwrake/worker/invoker.rb', line 78

def get_line(io)
  line = io.gets
  if line
    line.chomp!
    line.strip!
    @log.info ">#{line}"
  end
  return line
end

#kill_all(sig) ⇒ Object



180
181
182
183
# File 'lib/pwrake/worker/invoker.rb', line 180

def kill_all(sig)
  @log.warn "killing worker, signal=#{sig}"
  @ex_list.each{|id,ex| ex.kill(sig)}
end

#runObject



88
89
90
91
92
93
# File 'lib/pwrake/worker/invoker.rb', line 88

def run
  run_setup
  run_command
ensure
  close_all
end

#run_commandObject



129
130
131
132
133
134
135
# File 'lib/pwrake/worker/invoker.rb', line 129

def run_command
  Fiber.new{command_callback}.resume
  @selector.run
rescue => exc
  @log.error(([exc.to_s]+exc.backtrace).join("\n"))
  kill_all("TERM")
end

#run_setupObject



95
96
97
98
99
100
101
# File 'lib/pwrake/worker/invoker.rb', line 95

def run_setup
  setup_option
  Fiber.new{setup_loop}.resume
  @selector.run
rescue => exc
  @log.error(([exc.to_s]+exc.backtrace).join("\n"))
end

#send_ipaddrObject



35
36
37
# File 'lib/pwrake/mpi/worker.rb', line 35

def send_ipaddr
  # do nothing
end

#send_ncoreObject



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/pwrake/worker/invoker.rb', line 58

def send_ncore
  if @ncore.kind_of?(Integer)
    if @ncore <= 0
      @ncore += processor_count()
    end
    if @ncore <= 0
      m = "Out of range: ncore=#{@ncore.inspect}"
      @out.puts "ncore:"+m
      raise ArgumentError,m
    end
  elsif @ncore.nil?
    @ncore = processor_count()
  else
    m = "Invalid argument: ncore=#{@ncore.inspect}"
    @out.puts "ncore:"+m
    raise ArgumentError,m
  end
  @out.puts "ncore:#{@ncore}"
end

#setup_connectionObject



21
22
23
24
25
26
27
28
29
30
31
# File 'lib/pwrake/worker/invoker.rb', line 21

def setup_connection
  ioc, ior, iow = get_io()
  # read @ncore and @option
  @ncore,len = ior.read(8).unpack("V2")
  @option = Marshal.load(ior.read(len))
  # set pipe to branch-master
  @selector = NBIO::Selector.new(ioc)
  @rd = NBIO::Reader.new(@selector,ior)
  @out = Writer.instance
  @out.out = iow
end

#setup_loopObject



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/pwrake/worker/invoker.rb', line 111

def setup_loop
  loop do
    line = get_line(@rd)
    case line
    when /^(\d+):open$/o
      $1.split.each do |id|
        @ex_list[id] = Executor.new(@selector,@dir_class,id)
      end
    when "setup_end"
      return
    else
      if common_line(line)
        raise RuntimeError,"exit during setup_loop"
      end
    end
  end
end

#setup_optionObject



103
104
105
106
107
108
109
# File 'lib/pwrake/worker/invoker.rb', line 103

def setup_option
  @log.info @option.inspect
  @out.heartbeat = @option[:heartbeat]
  (@option[:pass_env]||{}).each do |k,v|
    ENV[k] = v
  end
end