Class: JoinMapper
- Inherits:
-
Object
- Object
- JoinMapper
- Defined in:
- lib/streaming_join/join_mapper.rb
Instance Method Summary collapse
- #add_opts(file_re, opts) ⇒ Object
- #add_side(file_re, *columns, &filter) ⇒ Object
-
#initialize ⇒ JoinMapper
constructor
A new instance of JoinMapper.
- #join_side ⇒ Object
- #process_stream(input = STDIN) ⇒ Object
- #report(detail) ⇒ Object
Constructor Details
#initialize ⇒ JoinMapper
Returns a new instance of JoinMapper.
2 3 4 5 6 7 8 9 10 11 |
# File 'lib/streaming_join/join_mapper.rb', line 2 def initialize # use our own input field separator variable since the stock # stream variable can't handle control characters @sep_in = ENV['streaming_join_input_field_separator'] || "\t" @sep_in = $1.hex.chr if @sep_in =~ /\A(?:\\u?)?(\d+)\Z/ @sep_out = ENV['stream_map_output_field_separator'] || "\t" @sep_out = $1.hex.chr if @sep_in =~ /\A(?:\\u?)?(\d+)\Z/ @join = [] end |
Instance Method Details
#add_opts(file_re, opts) ⇒ Object
29 30 31 32 33 34 |
# File 'lib/streaming_join/join_mapper.rb', line 29 def add_opts(file_re, opts) @join.each do |j| next if j[:file_re] != file_re j.merge! opts end end |
#add_side(file_re, *columns, &filter) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/streaming_join/join_mapper.rb', line 17 def add_side(file_re, *columns, &filter) h = { file_re: file_re, columns: columns, filter: filter, sep: @sep_in, side: @join.size } @join << h h end |
#join_side ⇒ Object
36 37 38 39 40 41 42 |
# File 'lib/streaming_join/join_mapper.rb', line 36 def join_side input_file = ENV['map_input_file'] @join.each do |j| return j if input_file =~ j[:file_re] end raise "how do I handle input file '#{input_file}'?" end |
#process_stream(input = STDIN) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/streaming_join/join_mapper.rb', line 44 def process_stream(input = STDIN) last_key = key = nil j = join_side cols = j[:columns] filter = j[:filter] side = j[:side] sep = j[:sep] input.each do |line| fields = line.chomp.split(sep, -1) c = [] cols.each_with_index do |col,i| value = fields[col] break if i == 0 and value.nil? # can't have nil key c << value end next if c.empty? next if filter and not filter.call(c) o = "#{c[0]}#{@sep_out}#{side}#{@sep_out}" o << c[1...c.length].join(@sep_out) puts o end end |
#report(detail) ⇒ Object
13 14 15 |
# File 'lib/streaming_join/join_mapper.rb', line 13 def report detail STDERR.puts "reporter:counter:join,#{detail},1" end |