Class: Fairy::PGroupBy::DirectPQMergeSortBuffer::Merger

Inherits:
Fairy::PGroupBy::DirectMergeSortBuffer::Merger show all
Defined in:
lib/fairy/node/p-group-by.rb

Instance Method Summary collapse

Constructor Details

#initialize(njob, buffers) ⇒ Merger

Returns a new instance of Merger.



1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
# File 'lib/fairy/node/p-group-by.rb', line 1283

def initialize(njob, buffers)
  @njob = njob
  @buffers = PriorityQueue.new
  buffers.each{|buf|
    cb = DirectMergeSortBuffer::CachedBuffer.new(@njob, buf)
    next if cb.eof?
    @buffers.push cb, cb.key
  }

  @key = nil
end

Instance Method Details

#each(&block) ⇒ Object



1295
1296
1297
1298
1299
1300
1301
# File 'lib/fairy/node/p-group-by.rb', line 1295

def each(&block)
  while !@buffers.empty?
    @key = @buffers.min_key.key
    values = DirectMergeSortBuffer::KeyValueStream.new(@key, self)
    block.call values
  end
end

#each_by_key(&block) ⇒ Object



1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
# File 'lib/fairy/node/p-group-by.rb', line 1303

def each_by_key(&block)
  while buf_min = @buffers.delete_min_return_key
    vv_key = buf_min.key
    unless  @key == vv_key
      @buffers.push buf_min, buf_min.key
      return
    end

    buf_min.each_by_same_key(&block)

    if buf_min.eof?
      buf_min.close!
      return
    end
  
    @buffers.push buf_min, buf_min.key
  end
end

#get_buf(values) ⇒ Object



1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
# File 'lib/fairy/node/p-group-by.rb', line 1323

def get_buf(values)
  unless buf_min = @buffers.delete_min_return_key
    values.push_eos
    return
  end

  vv_key = buf_min.key
  unless  @key == vv_key
    values.push_eos
    @buffers.push buf_min, buf_min.key
    return
  end

  vv = buf_min.shift_values
  if vv
    values.concat vv
  end
  if buf_min.eof?
    buf_min.close!
    return
  end
  
  @buffers.push buf_min, buf_min.key
end