Class: Tapsoob::DataStreamKeyed
- Inherits:
-
DataStream
- Object
- DataStream
- Tapsoob::DataStreamKeyed
- Defined in:
- lib/tapsoob/data_stream.rb
Constant Summary
Constants inherited from DataStream
Tapsoob::DataStream::DEFAULT_CHUNKSIZE
Instance Attribute Summary collapse
-
#buffer ⇒ Object
Returns the value of attribute buffer.
Attributes inherited from DataStream
Instance Method Summary collapse
- #buffer_limit ⇒ Object
- #calc_limit(chunksize) ⇒ Object
- #fetch_buffered(chunksize) ⇒ Object
-
#increment(row_count) ⇒ Object
def fetch_rows chunksize = state Tapsoob::Utils.format_data(fetch_buffered(chunksize) || [], :string_columns => string_columns) end.
-
#initialize(db, state, opts = {}) ⇒ DataStreamKeyed
constructor
A new instance of DataStreamKeyed.
- #load_buffer(chunksize) ⇒ Object
- #primary_key ⇒ Object
- #verify_stream ⇒ Object
Methods inherited from DataStream
#complete?, #encode_rows, #error, #error=, factory, #fetch, #fetch_data_from_database, #fetch_data_to_database, #fetch_file, #fetch_rows, #import_rows, #log, #max_chunksize_training, #order_by, #parse_encoded_data, parse_json, #string_columns, #table, #table_name, #table_name_sql, #to_hash, #to_json, #update_chunksize_stats
Constructor Details
#initialize(db, state, opts = {}) ⇒ DataStreamKeyed
Returns a new instance of DataStreamKeyed.
270 271 272 273 274 275 |
# File 'lib/tapsoob/data_stream.rb', line 270 def initialize(db, state, opts = {}) super(db, state, opts) @state = { :primary_key => order_by(state[:table_name]).first, :filter => 0 }.merge(@state) @state[:chunksize] ||= DEFAULT_CHUNKSIZE @buffer = [] end |
Instance Attribute Details
#buffer ⇒ Object
Returns the value of attribute buffer.
268 269 270 |
# File 'lib/tapsoob/data_stream.rb', line 268 def buffer @buffer end |
Instance Method Details
#buffer_limit ⇒ Object
281 282 283 284 285 286 287 |
# File 'lib/tapsoob/data_stream.rb', line 281 def buffer_limit if state[:last_fetched] and state[:last_fetched] < state[:filter] and self.buffer.size == 0 state[:last_fetched] else state[:filter] end end |
#calc_limit(chunksize) ⇒ Object
289 290 291 292 293 294 295 296 297 298 |
# File 'lib/tapsoob/data_stream.rb', line 289 def calc_limit(chunksize) # we want to not fetch more than is needed while we're # inside sinatra but locally we can select more than # is strictly needed if defined?(Sinatra) (chunksize * 1.1).ceil else (chunksize * 3).ceil end end |
#fetch_buffered(chunksize) ⇒ Object
324 325 326 327 328 329 330 331 332 333 |
# File 'lib/tapsoob/data_stream.rb', line 324 def fetch_buffered(chunksize) load_buffer(chunksize) if self.buffer.size < chunksize rows = buffer.slice(0, chunksize) state[:last_fetched] = if rows.size > 0 rows.last[ primary_key ] else nil end rows end |
#increment(row_count) ⇒ Object
def fetch_rows
chunksize = state[:chunksize]
Tapsoob::Utils.format_data(fetch_buffered(chunksize) || [],
:string_columns => string_columns)
end
345 346 347 348 |
# File 'lib/tapsoob/data_stream.rb', line 345 def increment(row_count) # pop the rows we just successfully sent off the buffer @buffer.slice!(0, row_count) end |
#load_buffer(chunksize) ⇒ Object
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 |
# File 'lib/tapsoob/data_stream.rb', line 300 def load_buffer(chunksize) # make sure BasicObject is not polluted by subsequent requires Sequel::BasicObject.remove_methods! num = 0 loop do limit = calc_limit(chunksize) # we have to use local variables in order for the virtual row filter to work correctly key = primary_key buf_limit = buffer_limit ds = table.order(*order_by).filter { key.sql_number > buf_limit }.limit(limit) log.debug "DataStreamKeyed#load_buffer SQL -> #{ds.sql}" data = ds.all self.buffer += data num += data.size if data.size > 0 # keep a record of the last primary key value in the buffer state[:filter] = self.buffer.last[ primary_key ] end break if num >= chunksize or data.size == 0 end end |
#primary_key ⇒ Object
277 278 279 |
# File 'lib/tapsoob/data_stream.rb', line 277 def primary_key state[:primary_key].to_sym end |
#verify_stream ⇒ Object
350 351 352 353 354 355 356 357 358 359 360 361 |
# File 'lib/tapsoob/data_stream.rb', line 350 def verify_stream key = primary_key ds = table.order(*order_by) current_filter = ds.max(key.sql_number) # set the current filter to the max of the primary key state[:filter] = current_filter # clear out the last_fetched value so it can restart from scratch state[:last_fetched] = nil log.debug "DataStreamKeyed#verify_stream -> state: #{state.inspect}" end |