Class: Taps::DataStreamKeyed
Constant Summary
Constants inherited
from DataStream
Taps::DataStream::DEFAULT_CHUNKSIZE
Instance Attribute Summary collapse
Attributes inherited from DataStream
#db, #state
Instance Method Summary
collapse
Methods inherited from DataStream
#complete?, #encode_rows, #error, #error=, factory, #fetch, #fetch_chunksize, #fetch_from_resource, #fetch_remote, #fetch_remote_in_server, #log, #max_chunksize_training, #order_by, #parse_encoded_data, parse_json, #string_columns, #table, #table_name, #table_name_sql, #to_hash, #to_json, #update_chunksize_stats, #verify_remote_stream
Constructor Details
Returns a new instance of DataStreamKeyed.
249
250
251
252
253
254
|
# File 'lib/taps/data_stream.rb', line 249
def initialize(db, state)
super(db, state)
@state = { :primary_key => order_by(state[:table_name]).first, :filter => 0 }.merge(state)
@state[:chunksize] ||= DEFAULT_CHUNKSIZE
@buffer = []
end
|
Instance Attribute Details
#buffer ⇒ Object
Returns the value of attribute buffer.
247
248
249
|
# File 'lib/taps/data_stream.rb', line 247
def buffer
@buffer
end
|
Instance Method Details
#buffer_limit ⇒ Object
260
261
262
263
264
265
266
|
# File 'lib/taps/data_stream.rb', line 260
def buffer_limit
if state[:last_fetched] and state[:last_fetched] < state[:filter] and self.buffer.size == 0
state[:last_fetched]
else
state[:filter]
end
end
|
#calc_limit(chunksize) ⇒ Object
268
269
270
271
272
273
274
275
276
277
|
# File 'lib/taps/data_stream.rb', line 268
def calc_limit(chunksize)
if defined?(Sinatra)
(chunksize * 1.1).ceil
else
(chunksize * 3).ceil
end
end
|
#fetch_buffered(chunksize) ⇒ Object
303
304
305
306
307
308
309
310
311
312
|
# File 'lib/taps/data_stream.rb', line 303
def fetch_buffered(chunksize)
load_buffer(chunksize) if self.buffer.size < chunksize
rows = buffer.slice(0, chunksize)
state[:last_fetched] = if rows.size > 0
rows.last[ primary_key ]
else
nil
end
rows
end
|
#fetch_rows ⇒ Object
318
319
320
321
322
|
# File 'lib/taps/data_stream.rb', line 318
def fetch_rows
chunksize = state[:chunksize]
Taps::Utils.format_data(fetch_buffered(chunksize) || [],
:string_columns => string_columns)
end
|
#import_rows(rows) ⇒ Object
314
315
316
|
# File 'lib/taps/data_stream.rb', line 314
def import_rows(rows)
table.import(rows[:header], rows[:data])
end
|
#increment(row_count) ⇒ Object
324
325
326
327
|
# File 'lib/taps/data_stream.rb', line 324
def increment(row_count)
@buffer.slice!(0, row_count)
end
|
#load_buffer(chunksize) ⇒ Object
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
|
# File 'lib/taps/data_stream.rb', line 279
def load_buffer(chunksize)
Sequel::BasicObject.remove_methods!
num = 0
loop do
limit = calc_limit(chunksize)
key = primary_key
buf_limit = buffer_limit
ds = table.order(*order_by).filter { key.sql_number > buf_limit }.limit(limit)
log.debug "DataStreamKeyed#load_buffer SQL -> #{ds.sql}"
data = ds.all
self.buffer += data
num += data.size
if data.size > 0
state[:filter] = self.buffer.last[ primary_key ]
end
break if num >= chunksize or data.size == 0
end
end
|
#primary_key ⇒ Object
256
257
258
|
# File 'lib/taps/data_stream.rb', line 256
def primary_key
state[:primary_key].to_sym
end
|
#verify_stream ⇒ Object
329
330
331
332
333
334
335
336
337
338
339
340
|
# File 'lib/taps/data_stream.rb', line 329
def verify_stream
key = primary_key
ds = table.order(*order_by)
current_filter = ds.max(key.sql_number)
state[:filter] = current_filter
state[:last_fetched] = nil
log.debug "DataStreamKeyed#verify_stream -> state: #{state.inspect}"
end
|