Class: Join
- Inherits:
-
Object
show all
- Defined in:
- lib/streaming_join/join.rb
Overview
base Join class (inner join)
Instance Method Summary
collapse
Constructor Details
#initialize(opts = {}) ⇒ Join
Returns a new instance of Join.
3
4
5
6
7
8
9
10
|
# File 'lib/streaming_join/join.rb', line 3
def initialize opts = {}
@sep_in = ENV['stream_map_output_field_separator'] || "\t"
@sep_out = ENV['streaming_join_output_separator'] || "\t"
@sep_out = $1.hex.chr if @sep_out =~ /\A(?:\\u?)?(\d+)\Z/
@report = opts.fetch :report, true
@cols_l = ENV['streaming_join_cols_left'].to_i
@cols_r = ENV['streaming_join_cols_right'].to_i
end
|
Instance Method Details
#null_left(key, right) ⇒ Object
28
29
|
# File 'lib/streaming_join/join.rb', line 28
def null_left key, right
end
|
#null_right(key, left) ⇒ Object
31
32
|
# File 'lib/streaming_join/join.rb', line 31
def null_right key, left
end
|
#output(key, left, right) ⇒ Object
17
18
19
20
21
22
23
24
25
26
|
# File 'lib/streaming_join/join.rb', line 17
def output key, left, right
left.each do |l|
o = "#{key}#{@sep_out}#{l}#{@sep_out}#{right}"
if block_given?
yield o
else
puts o
end
end
end
|
#process_stream(input = STDIN, &blk) ⇒ Object
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
# File 'lib/streaming_join/join.rb', line 34
def process_stream(input = STDIN, &blk)
last_key = key = nil
last_side = nil
left = []
input.each do |line|
key, side, value = line.chomp.split(@sep_in, 3)
report 'keys' if last_key != key
side = side.to_i
if side == 0
if last_key != key and last_side == 0
report 'null right'
null_right last_key, left
end
if last_key != key
left = []
end
left << value
else
if not last_key or last_key != key or left.empty?
report 'null left' if last_key != key
null_left key, value
left = []
else
report 'left and right' if last_side == 0
output key, left, value
end
end
last_side = side
last_key = key
end
if last_side == 0
report 'null right'
null_right(key, left)
end
end
|
#report(detail) ⇒ Object
12
13
14
15
|
# File 'lib/streaming_join/join.rb', line 12
def report detail
return if not @report
STDERR.puts "reporter:counter:join,#{detail},1"
end
|