Class: Pmux::StreamingReducer
Instance Attribute Summary
Attributes inherited from Reducer
#exitstatus, #output_path, #tmp_dir
Instance Method Summary
collapse
Methods included from FixCmdLine
#fix_cmd_line
Methods inherited from Reducer
#initialize
Constructor Details
This class inherits a constructor from Pmux::Reducer
Instance Method Details
#do_reduce_task ⇒ Object
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
# File 'lib/pmux/reducer.rb', line 34
def do_reduce_task
reducer_cmd = @task['reducer'] || 'cat'
@output_path = "#{@tmp_dir}/r#{@task['pindex']}"
err_path = "#{@tmp_dir}/.err.#{$$}"
err_msg = nil
cmd_line = fix_cmd_line reducer_cmd,
"#{tmp_dir}/t*-#{@task['pindex']}", @output_path, err_path, tmp_dir
Log.debug "system: #{cmd_line}"
system cmd_line
@exitstatus = $?.exitstatus
if File.size? err_path
err_msg = File.read(err_path).chomp!
raise RuntimeError, err_msg
end
if @exitstatus > 1
raise RuntimeError, "failed to execute reducer: #{cmd_line}"
end
@output_path
end
|
#do_streaming_reduce_task ⇒ Object
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
# File 'lib/pmux/reducer.rb', line 56
def do_streaming_reduce_task
reducer_cmd = @task['reducer'] || 'cat'
@output_path = "#{@tmp_dir}/r#{@task['pindex']}"
err_path = "#{@tmp_dir}/.rerr.#{$$}"
err_msg = nil
cmd_line = fix_cmd_line reducer_cmd,
"#{tmp_dir}/t*-#{@task['pindex']}", nil, err_path, tmp_dir
Log.debug "popen: #{cmd_line}"
pipeio = nil
Dir.chdir(@tmp_dir) {pipeio = PipeIO.new cmd_line}
if @on_receive
pipeio.on_receive &@on_receive
else
out = open(@output_path, 'a')
pipeio.on_receive {|data|
out.write data
}
end
on_success = @on_success
on_error = @on_error
pipeio.on_close {
if out
out.close rescue nil
end
if File.size? err_path
err_msg = File.read(err_path).chomp!
e = RuntimeError.new err_msg
e.set_backtrace ['reducer']
on_error.call e if on_error
else
on_success.call self if on_success
end
}
@loop.attach pipeio
end
|
#on_error(&block) ⇒ Object
30
31
32
|
# File 'lib/pmux/reducer.rb', line 30
def on_error &block
@on_error = block
end
|
#on_receive(&block) ⇒ Object
24
25
26
|
# File 'lib/pmux/reducer.rb', line 24
def on_receive &block
@on_receive = block
end
|
#on_success(&block) ⇒ Object
27
28
29
|
# File 'lib/pmux/reducer.rb', line 27
def on_success &block
@on_success = block
end
|