Class: Fairy::PGroupBy::DirectMergeSortBuffer::Merger

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/node/p-group-by.rb

Instance Method Summary collapse

Constructor Details

#initialize(njob, buffers, cached_buffer_class = CachedBuffer) ⇒ Merger

Returns a new instance of Merger.



975
976
977
978
979
980
# File 'lib/fairy/node/p-group-by.rb', line 975

def initialize(njob, buffers, cached_buffer_class = CachedBuffer)
  @njob = njob
  @buffers = buffers.collect{|buf| cached_buffer_class.new(@njob, buf)}.select{|buf| !buf.eof?}.sort_by{|buf| buf.key}

  @key = nil
end

Instance Method Details

#each(&block) ⇒ Object



982
983
984
985
986
987
988
# File 'lib/fairy/node/p-group-by.rb', line 982

def each(&block)
  while !@buffers.empty?
    @key = @buffers.first.key
    values = KeyValueStream.new(@key, self)
    block.call values
  end
end

#each_by_key(&block) ⇒ Object



990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
# File 'lib/fairy/node/p-group-by.rb', line 990

def each_by_key(&block)
  while buf_min = @buffers.shift
    vv_key = buf_min.key
    unless  @key == vv_key
      @buffers.unshift buf_min
      return
    end

    buf_min.each_by_same_key(&block)

    if buf_min.eof?
      buf_min.close!
      next
    end
    
    if vv_key == buf_min.key
      @buffers.unshift(buf_min)
    else
      idx = @buffers.rindex{|buf| buf.key <= buf_min.key}
      idx ? @buffers.insert(idx+1, buf_min) : @buffers.unshift(buf_min)
    end
  end
end

#get_buf(values) ⇒ Object



1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
# File 'lib/fairy/node/p-group-by.rb', line 1014

def get_buf(values)
  unless buf_min = @buffers.shift
    values.push_eos
    return
  end

  vv_key = buf_min.key
  unless  @key == vv_key
    values.push_eos
    @buffers.unshift buf_min
    return
  end

  vv = buf_min.shift_values
  if vv
    values.concat vv
  end
  if buf_min.eof?
    buf_min.close!
    return
  end
  
  idx = @buffers.rindex{|buf| buf.key <= buf_min.key}
  idx ? @buffers.insert(idx+1, buf_min) : @buffers.unshift(buf_min)
end