Class: Pmux::StreamingMapper
Constant Summary
collapse
- CHUNK_SIZE =
8192
Instance Attribute Summary
Attributes inherited from Mapper
#exitstatus, #ifbase, #num_r, #tmp_dir
Instance Method Summary
collapse
Methods included from FixCmdLine
#fix_cmd_line
Methods inherited from Mapper
#initialize
Constructor Details
This class inherits a constructor from Pmux::Mapper
Instance Method Details
#do_map_task ⇒ Object
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/pmux/mapper.rb', line 42
def do_map_task
mapper_cmd = @task['mapper'] || 'cat'
err_path = "#{tmp_dir}/.err.#{$$}"
err_msg = nil
if @num_r <= 1
cmd_line = fix_cmd_line mapper_cmd,
@path, "#{@ifbase}-0", err_path, tmp_dir
Log.debug "system: #{cmd_line}"
system cmd_line
else partitioner = TextPartitioner.new @ifbase, @num_r,
:separator=>@task['separator']
cmd_line = fix_cmd_line mapper_cmd, @path, nil, err_path, tmp_dir
IO.popen(cmd_line, 'r') {|io|
until io.eof?
data = io.read CHUNK_SIZE
partitioner.emit data
end
}
partitioner.close
end
@exitstatus = $?.exitstatus
if File.size? err_path
err_msg = File.read(err_path).chomp!
raise RuntimeError, err_msg
end
if @exitstatus > 1
raise RuntimeError, "failed to execute mapper: #{cmd_line}"
end
@ifbase
end
|
#do_streaming_map_task ⇒ Object
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/pmux/mapper.rb', line 74
def do_streaming_map_task
mapper_cmd = @task['mapper'] || 'cat'
err_path = "#{tmp_dir}/.err.#{object_id}"
err_msg = nil
pipeio = nil
if @num_r <= 1
cmd_line = fix_cmd_line mapper_cmd,
@path, nil, err_path, tmp_dir
Log.debug "pipe: #{cmd_line}"
Dir.chdir(tmp_dir) {pipeio = PipeIO.new cmd_line}
out = open("#{@ifbase}-0", 'a')
pipeio.on_receive {|data| out.write data}
else partitioner = TextPartitioner.new @ifbase, @num_r,
:separator=>@task['separator']
cmd_line = fix_cmd_line mapper_cmd, @path, nil, err_path, tmp_dir
Dir.chdir(tmp_dir) {pipeio = PipeIO.new cmd_line}
pipeio.on_receive {|data| partitioner.emit data}
end
on_success = @on_success
on_error = @on_error
pipeio.on_close {
if out
out.close rescue nil
end
if partitioner
partitioner.close
end
if File.size? err_path
err_msg = File.read(err_path).chomp!
e = RuntimeError.new err_msg
e.set_backtrace ['mapper']
on_error.call e if on_error
else
on_success.call if on_success
end
}
@loop.attach pipeio
end
|
#on_error(&block) ⇒ Object
38
39
40
|
# File 'lib/pmux/mapper.rb', line 38
def on_error &block
@on_error = block
end
|
#on_success(&block) ⇒ Object
35
36
37
|
# File 'lib/pmux/mapper.rb', line 35
def on_success &block
@on_success = block
end
|
#result_body ⇒ Object
119
120
121
|
# File 'lib/pmux/mapper.rb', line 119
def result_body
@num_r <= 1 ? File.read("#{@ifbase}-0") : nil
end
|
#result_size ⇒ Object
115
116
117
|
# File 'lib/pmux/mapper.rb', line 115
def result_size
@num_r <= 1 ? File.size?("#{@ifbase}-0") : nil
end
|