Class: Join

Inherits:
Object
  • Object
show all
Defined in:
lib/streaming_join/join.rb

Overview

base Join class (inner join)

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Join

Returns a new instance of Join.



3
4
5
6
7
8
9
10
# File 'lib/streaming_join/join.rb', line 3

def initialize opts = {}
  @sep_in  = ENV['stream_map_output_field_separator'] || "\t"
  @sep_out = ENV['streaming_join_output_separator'] || "\t"
  @sep_out = $1.hex.chr if @sep_out =~ /\A(?:\\u?)?(\d+)\Z/
  @report  = opts.fetch :report, true
  @cols_l  = ENV['streaming_join_cols_left'].to_i
  @cols_r  = ENV['streaming_join_cols_right'].to_i
end

Instance Method Details

#null_left(key, right) ⇒ Object



28
29
# File 'lib/streaming_join/join.rb', line 28

def null_left key, right
end

#null_right(key, left) ⇒ Object



31
32
# File 'lib/streaming_join/join.rb', line 31

def null_right key, left
end

#output(key, left, right) ⇒ Object



17
18
19
20
21
22
23
24
25
26
# File 'lib/streaming_join/join.rb', line 17

def output key, left, right
  left.each do |l|
    o = "#{key}#{@sep_out}#{l}#{@sep_out}#{right}"
    if block_given?
      yield o
    else
      puts o
    end
  end
end

#process_stream(input = STDIN, &blk) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/streaming_join/join.rb', line 34

def process_stream(input = STDIN, &blk)
  last_key = key = nil
  last_side = nil
  left = []

  input.each do |line|
    key, side, value = line.chomp.split(@sep_in, 3)

    report 'keys' if last_key != key

    side = side.to_i
    if side == 0
      # if we are on the left side and just processed the left side
      # of another key, we didn't get any right side records
      if last_key != key and last_side == 0
        report 'null right'
        null_right last_key, left
      end

      if last_key != key
        left = []
      end
      left << value
    else
      # if we're in a new key and the first record is a right side
      # record, that means we never processed a left side
      if not last_key or last_key != key or left.empty?
        report 'null left' if last_key != key
        null_left key, value
        left = []
      else
        report 'left and right' if last_side == 0
        output key, left, value
      end
    end

    last_side = side
    last_key = key
  end

  # if the last processed record is on the left, there is
  # not going to be a right side
  if last_side == 0
    report 'null right'
    null_right(key, left)
  end
end

#report(detail) ⇒ Object



12
13
14
15
# File 'lib/streaming_join/join.rb', line 12

def report detail
  return if not @report
  STDERR.puts "reporter:counter:join,#{detail},1"
end