Class: Fairy::PGroupBy::DirectPQMergeSortBuffer::Merger
- Inherits:
-
Fairy::PGroupBy::DirectMergeSortBuffer::Merger
- Object
- Fairy::PGroupBy::DirectMergeSortBuffer::Merger
- Fairy::PGroupBy::DirectPQMergeSortBuffer::Merger
- Defined in:
- lib/fairy/node/p-group-by.rb
Instance Method Summary collapse
- #each(&block) ⇒ Object
- #each_by_key(&block) ⇒ Object
- #get_buf(values) ⇒ Object
-
#initialize(njob, buffers) ⇒ Merger
constructor
A new instance of Merger.
Constructor Details
#initialize(njob, buffers) ⇒ Merger
Returns a new instance of Merger.
1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 |
# File 'lib/fairy/node/p-group-by.rb', line 1283 def initialize(njob, buffers) @njob = njob @buffers = PriorityQueue.new buffers.each{|buf| cb = DirectMergeSortBuffer::CachedBuffer.new(@njob, buf) next if cb.eof? @buffers.push cb, cb.key } @key = nil end |
Instance Method Details
#each(&block) ⇒ Object
1295 1296 1297 1298 1299 1300 1301 |
# File 'lib/fairy/node/p-group-by.rb', line 1295 def each(&block) while !@buffers.empty? @key = @buffers.min_key.key values = DirectMergeSortBuffer::KeyValueStream.new(@key, self) block.call values end end |
#each_by_key(&block) ⇒ Object
1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 |
# File 'lib/fairy/node/p-group-by.rb', line 1303 def each_by_key(&block) while buf_min = @buffers.delete_min_return_key vv_key = buf_min.key unless @key == vv_key @buffers.push buf_min, buf_min.key return end buf_min.each_by_same_key(&block) if buf_min.eof? buf_min.close! return end @buffers.push buf_min, buf_min.key end end |
#get_buf(values) ⇒ Object
1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 |
# File 'lib/fairy/node/p-group-by.rb', line 1323 def get_buf(values) unless buf_min = @buffers.delete_min_return_key values.push_eos return end vv_key = buf_min.key unless @key == vv_key values.push_eos @buffers.push buf_min, buf_min.key return end vv = buf_min.shift_values if vv values.concat vv end if buf_min.eof? buf_min.close! return end @buffers.push buf_min, buf_min.key end |