Class: Tapsoob::DataStreamKeyed
Constant Summary
Constants inherited
from DataStream
Tapsoob::DataStream::DEFAULT_CHUNKSIZE
Instance Attribute Summary collapse
Attributes inherited from DataStream
#db, #options, #state
Instance Method Summary
collapse
Methods inherited from DataStream
#complete?, #encode_rows, #error, #error=, factory, #fetch, #fetch_chunksize, #fetch_data_in_database, #fetch_database, #fetch_file, #fetch_from_database, #fetch_rows, #import_rows, #log, #max_chunksize_training, #order_by, #parse_encoded_data, parse_json, #string_columns, #table, #table_name, #table_name_sql, #to_hash, #to_json, #update_chunksize_stats
Constructor Details
#initialize(db, state, opts = {}) ⇒ DataStreamKeyed
306
307
308
309
310
311
|
# File 'lib/tapsoob/data_stream.rb', line 306
def initialize(db, state, opts = {})
super(db, state, opts)
@state = { :primary_key => order_by(state[:table_name]).first, :filter => 0 }.merge(@state)
@state[:chunksize] ||= DEFAULT_CHUNKSIZE
@buffer = []
end
|
Instance Attribute Details
#buffer ⇒ Object
Returns the value of attribute buffer.
304
305
306
|
# File 'lib/tapsoob/data_stream.rb', line 304
def buffer
@buffer
end
|
Instance Method Details
#buffer_limit ⇒ Object
317
318
319
320
321
322
323
|
# File 'lib/tapsoob/data_stream.rb', line 317
def buffer_limit
if state[:last_fetched] and state[:last_fetched] < state[:filter] and self.buffer.size == 0
state[:last_fetched]
else
state[:filter]
end
end
|
#calc_limit(chunksize) ⇒ Object
325
326
327
328
329
330
331
332
333
334
|
# File 'lib/tapsoob/data_stream.rb', line 325
def calc_limit(chunksize)
if defined?(Sinatra)
(chunksize * 1.1).ceil
else
(chunksize * 3).ceil
end
end
|
#fetch_buffered(chunksize) ⇒ Object
360
361
362
363
364
365
366
367
368
369
|
# File 'lib/tapsoob/data_stream.rb', line 360
def fetch_buffered(chunksize)
load_buffer(chunksize) if self.buffer.size < chunksize
rows = buffer.slice(0, chunksize)
state[:last_fetched] = if rows.size > 0
rows.last[ primary_key ]
else
nil
end
rows
end
|
#increment(row_count) ⇒ Object
def fetch_rows
chunksize = state[:chunksize]
Tapsoob::Utils.format_data(fetch_buffered(chunksize) || [],
:string_columns => string_columns)
end
381
382
383
384
|
# File 'lib/tapsoob/data_stream.rb', line 381
def increment(row_count)
@buffer.slice!(0, row_count)
end
|
#load_buffer(chunksize) ⇒ Object
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
|
# File 'lib/tapsoob/data_stream.rb', line 336
def load_buffer(chunksize)
Sequel::BasicObject.remove_methods!
num = 0
loop do
limit = calc_limit(chunksize)
key = primary_key
buf_limit = buffer_limit
ds = table.order(*order_by).filter { key.sql_number > buf_limit }.limit(limit)
log.debug "DataStreamKeyed#load_buffer SQL -> #{ds.sql}"
data = ds.all
self.buffer += data
num += data.size
if data.size > 0
state[:filter] = self.buffer.last[ primary_key ]
end
break if num >= chunksize or data.size == 0
end
end
|
#primary_key ⇒ Object
313
314
315
|
# File 'lib/tapsoob/data_stream.rb', line 313
def primary_key
state[:primary_key].to_sym
end
|
#verify_stream ⇒ Object
386
387
388
389
390
391
392
393
394
395
396
397
|
# File 'lib/tapsoob/data_stream.rb', line 386
def verify_stream
key = primary_key
ds = table.order(*order_by)
current_filter = ds.max(key.sql_number)
state[:filter] = current_filter
state[:last_fetched] = nil
log.debug "DataStreamKeyed#verify_stream -> state: #{state.inspect}"
end
|