10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/rubadoop/map_reduce/reducable.rb', line 10
def reducer(&block)
lines = []
last_key = nil
enum = nil
line = MapReduce.io_in.gets
if line.nil?
return lines unless block_given?
return nil
end
line.chomp!
key, value = key_value_split(line)
last_key ||= key
loop do
if enum && !enum.flushed
enum.each { |v| }
break if (line == nil)
end
enum = TrackingEnumerator.new { |y|
y << value
loop do
line = MapReduce.io_in.gets
if line.nil?
enum.flushed = true
break
end
line.chomp!
key, value = key_value_split(line)
if key != last_key
enum.flushed = true
break
end
y << value
end
last_key = key
}
enum.flushed = false
if block_given?
block.call(last_key, enum)
else
lines << {key: last_key, values: enum.to_a} end
break if (line == nil)
end
lines unless block_given?
end
|