Class: Taps::DataStreamKeyed

Inherits:
DataStream show all
Defined in:
lib/taps/data_stream.rb

Instance Attribute Summary collapse

Attributes inherited from DataStream

#db, #state

Instance Method Summary collapse

Methods inherited from DataStream

#complete?, #encode_rows, #error, #error=, factory, #fetch, #fetch_chunksize, #fetch_from_resource, #fetch_remote, #fetch_remote_in_server, #log, #max_chunksize_training, #order_by, #parse_encoded_data, parse_json, #string_columns, #table, #table_name, #table_name_sql, #to_hash, #to_json, #update_chunksize_stats

Constructor Details

#initialize(db, state) ⇒ DataStreamKeyed

Returns a new instance of DataStreamKeyed.



223
224
225
226
227
# File 'lib/taps/data_stream.rb', line 223

def initialize(db, state)
	super(db, state)
	@state = { :primary_key => order_by(state[:table_name]).first, :filter => 0 }.merge(state)
	@buffer = []
end

Instance Attribute Details

#bufferObject

Returns the value of attribute buffer.



221
222
223
# File 'lib/taps/data_stream.rb', line 221

def buffer
  @buffer
end

Instance Method Details

#buffer_limitObject



233
234
235
236
237
238
239
# File 'lib/taps/data_stream.rb', line 233

def buffer_limit
	if state[:last_fetched] and state[:last_fetched] < state[:filter] and self.buffer.size == 0
		state[:last_fetched]
	else
		state[:filter]
	end
end

#calc_limit(chunksize) ⇒ Object



241
242
243
244
245
246
247
248
249
250
# File 'lib/taps/data_stream.rb', line 241

def calc_limit(chunksize)
	# we want to not fetch more than is needed while we're
	# inside sinatra but locally we can select more than
	# is strictly needed
	if defined?(Sinatra)
		(chunksize * 1.1).ceil
	else
		(chunksize * 3).ceil
	end
end

#fetch_buffered(chunksize) ⇒ Object



276
277
278
279
280
281
282
283
284
285
# File 'lib/taps/data_stream.rb', line 276

def fetch_buffered(chunksize)
	load_buffer(chunksize) if self.buffer.size < chunksize
	rows = buffer.slice(0, chunksize)
	state[:last_fetched] = if rows.size > 0
		rows.last[ primary_key ]
	else
		nil
	end
	rows
end

#fetch_rowsObject



291
292
293
294
295
# File 'lib/taps/data_stream.rb', line 291

def fetch_rows
	chunksize = state[:chunksize]
	Taps::Utils.format_data(fetch_buffered(chunksize) || [],
		:string_columns => string_columns)
end

#import_rows(rows) ⇒ Object



287
288
289
# File 'lib/taps/data_stream.rb', line 287

def import_rows(rows)
	table.import(rows[:header], rows[:data])
end

#increment(row_count) ⇒ Object



297
298
299
300
# File 'lib/taps/data_stream.rb', line 297

def increment(row_count)
	# pop the rows we just successfully sent off the buffer
	@buffer.slice!(0, row_count)
end

#load_buffer(chunksize) ⇒ Object



252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/taps/data_stream.rb', line 252

def load_buffer(chunksize)
	# make sure BasicObject is not polluted by subsequent requires
	Sequel::BasicObject.remove_methods!

	num = 0
	loop do
		limit = calc_limit(chunksize)
		# we have to use local variables in order for the virtual row filter to work correctly
		key = primary_key
		buf_limit = buffer_limit
		ds = table.order(*order_by).filter { key.sql_number > buf_limit }.limit(limit)
		log.debug "DataStreamKeyed#load_buffer SQL -> #{ds.sql}"
		data = ds.all
		self.buffer += data
		num += data.size
		if data.size > 0
			# keep a record of the last primary key value in the buffer
			state[:filter] = self.buffer.last[ primary_key ]
		end

		break if num >= chunksize or data.size == 0
	end
end

#primary_keyObject



229
230
231
# File 'lib/taps/data_stream.rb', line 229

def primary_key
	state[:primary_key].to_sym
end