Class: Taps::DataStreamKeyed

Inherits:
DataStream show all
Defined in:
lib/taps/data_stream.rb

Constant Summary

Constants inherited from DataStream

Taps::DataStream::DEFAULT_CHUNKSIZE

Instance Attribute Summary collapse

Attributes inherited from DataStream

#db, #state

Instance Method Summary collapse

Methods inherited from DataStream

#complete?, #encode_rows, #error, #error=, factory, #fetch, #fetch_chunksize, #fetch_from_resource, #fetch_remote, #fetch_remote_in_server, #log, #max_chunksize_training, #order_by, #parse_encoded_data, parse_json, #string_columns, #table, #table_name, #table_name_sql, #to_hash, #to_json, #update_chunksize_stats, #verify_remote_stream

Constructor Details

#initialize(db, state) ⇒ DataStreamKeyed

Returns a new instance of DataStreamKeyed.



248
249
250
251
252
253
# File 'lib/taps/data_stream.rb', line 248

def initialize(db, state)
  super(db, state)
  @state = { primary_key: order_by(state[:table_name]).first, filter: 0 }.merge(state)
  @state[:chunksize] ||= DEFAULT_CHUNKSIZE
  @buffer = []
end

Instance Attribute Details

#bufferObject

Returns the value of attribute buffer.



246
247
248
# File 'lib/taps/data_stream.rb', line 246

def buffer
  @buffer
end

Instance Method Details

#buffer_limitObject



259
260
261
262
263
264
265
# File 'lib/taps/data_stream.rb', line 259

def buffer_limit
  if state[:last_fetched] && (state[:last_fetched] < state[:filter]) && buffer.empty?
    state[:last_fetched]
  else
    state[:filter]
  end
end

#calc_limit(chunksize) ⇒ Object



267
268
269
270
271
272
273
274
275
276
# File 'lib/taps/data_stream.rb', line 267

def calc_limit(chunksize)
  # we want to not fetch more than is needed while we're
  # inside sinatra but locally we can select more than
  # is strictly needed
  if defined?(Sinatra)
    (chunksize * 1.1).ceil
  else
    (chunksize * 3).ceil
  end
end

#fetch_buffered(chunksize) ⇒ Object



299
300
301
302
303
304
# File 'lib/taps/data_stream.rb', line 299

def fetch_buffered(chunksize)
  load_buffer(chunksize) if self.buffer.size < chunksize
  rows = buffer.slice(0, chunksize)
  state[:last_fetched] = (rows.last[primary_key] unless rows.empty?)
  rows
end

#fetch_rowsObject



310
311
312
313
314
# File 'lib/taps/data_stream.rb', line 310

def fetch_rows
  chunksize = state[:chunksize]
  Taps::Utils.format_data(fetch_buffered(chunksize) || [],
                          string_columns: string_columns)
end

#import_rows(rows) ⇒ Object



306
307
308
# File 'lib/taps/data_stream.rb', line 306

def import_rows(rows)
  table.import(rows[:header], rows[:data])
end

#increment(row_count) ⇒ Object



316
317
318
319
# File 'lib/taps/data_stream.rb', line 316

def increment(row_count)
  # pop the rows we just successfully sent off the buffer
  @buffer.slice!(0, row_count)
end

#load_buffer(chunksize) ⇒ Object



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/taps/data_stream.rb', line 278

def load_buffer(chunksize)
  num = 0
  loop do
    limit = calc_limit(chunksize)
    # we have to use local variables in order for the virtual row filter to work correctly
    key = primary_key
    buf_limit = buffer_limit
    ds = table.order(*order_by).filter { key.sql_number > buf_limit }.limit(limit)
    log.debug "DataStreamKeyed#load_buffer SQL -> #{ds.sql}"
    data = ds.all
    self.buffer += data
    num += data.size
    unless data.empty?
      # keep a record of the last primary key value in the buffer
      state[:filter] = self.buffer.last[primary_key]
    end

    break if (num >= chunksize) || data.empty?
  end
end

#primary_keyObject



255
256
257
# File 'lib/taps/data_stream.rb', line 255

def primary_key
  state[:primary_key].to_sym
end

#verify_streamObject



321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/taps/data_stream.rb', line 321

def verify_stream
  key = primary_key
  ds = table.order(*order_by)
  current_filter = ds.max(key.sql_number)

  # set the current filter to the max of the primary key
  state[:filter] = current_filter
  # clear out the last_fetched value so it can restart from scratch
  state[:last_fetched] = nil

  log.debug "DataStreamKeyed#verify_stream -> state: #{state.inspect}"
end