Method: Pmux::StreamingReducer#do_reduce_task

Defined in:
lib/pmux/reducer.rb

#do_reduce_taskObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/pmux/reducer.rb', line 34

def do_reduce_task
  reducer_cmd = @task['reducer'] || 'cat'
  @output_path = "#{@tmp_dir}/r#{@task['pindex']}"
  err_path = "#{@tmp_dir}/.err.#{$$}"
  err_msg = nil
  #cmd_line = fix_cmd_line reducer_cmd,
  #  @paths.join(' '), @output_path, err_path, tmp_dir
  cmd_line = fix_cmd_line reducer_cmd,
    "#{tmp_dir}/t*-#{@task['pindex']}", @output_path, err_path, tmp_dir
  Log.debug "system: #{cmd_line}"
  system cmd_line
  @exitstatus = $?.exitstatus
  if File.size? err_path
    err_msg = File.read(err_path).chomp!
    raise RuntimeError, err_msg
  end
  if @exitstatus > 1
    raise RuntimeError, "failed to execute reducer: #{cmd_line}"
  end
  @output_path
end