Class: JoinMapper

Inherits:
Object
  • Object
show all
Defined in:
lib/streaming_join/join_mapper.rb

Instance Method Summary collapse

Constructor Details

#initializeJoinMapper

Returns a new instance of JoinMapper.



2
3
4
5
6
7
8
9
10
11
# File 'lib/streaming_join/join_mapper.rb', line 2

def initialize
  # use our own input field separator variable since the stock
  # stream variable can't handle control characters
  @sep_in  = ENV['streaming_join_input_field_separator'] || "\t"
  @sep_in  = $1.hex.chr if @sep_in =~ /\A(?:\\u?)?(\d+)\Z/
  @sep_out = ENV['stream_map_output_field_separator'] || "\t"
  @sep_out = $1.hex.chr if @sep_in =~ /\A(?:\\u?)?(\d+)\Z/

  @join = []
end

Instance Method Details

#add_opts(file_re, opts) ⇒ Object



29
30
31
32
33
34
# File 'lib/streaming_join/join_mapper.rb', line 29

def add_opts(file_re, opts)
  @join.each do |j|
    next if j[:file_re] != file_re
    j.merge! opts
  end
end

#add_side(file_re, *columns, &filter) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/streaming_join/join_mapper.rb', line 17

def add_side(file_re, *columns, &filter)
  h = {
    file_re: file_re,
    columns: columns,
    filter:  filter,
    sep:     @sep_in,
    side:    @join.size
  }
  @join << h
  h
end

#join_sideObject



36
37
38
39
40
41
42
# File 'lib/streaming_join/join_mapper.rb', line 36

def join_side
  input_file = ENV['map_input_file']
  @join.each do |j|
    return j if input_file =~ j[:file_re]
  end
  raise "how do I handle input file '#{input_file}'?"
end

#process_stream(input = STDIN) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/streaming_join/join_mapper.rb', line 44

def process_stream(input = STDIN)
  last_key = key = nil

  j = join_side
  cols   = j[:columns]
  filter = j[:filter]
  side   = j[:side]
  sep    = j[:sep]

  input.each do |line|
    fields = line.chomp.split(sep, -1)

    c = []
    cols.each_with_index do |col,i|
      value = fields[col]
      break if i == 0 and value.nil? # can't have nil key
      c << value
    end
    next if c.empty?

    next if filter and not filter.call(c)

    o = "#{c[0]}#{@sep_out}#{side}#{@sep_out}"
    o << c[1...c.length].join(@sep_out)
    puts o
  end
end

#report(detail) ⇒ Object



13
14
15
# File 'lib/streaming_join/join_mapper.rb', line 13

def report detail
  STDERR.puts "reporter:counter:join,#{detail},1"
end