Class: Taps::Pull

Inherits:
Operation show all
Defined in:
lib/taps/operation.rb

Instance Attribute Summary

Attributes inherited from Operation

#database_url, #opts, #remote_url, #session_uri

Instance Method Summary collapse

Methods inherited from Operation

#apply_table_filter, #catch_errors, #close_session, #completed_tables, #compression_disabled?, #db, #default_chunksize, #exclude_tables, #exiting?, factory, #format_number, #http_headers, #indexes_first?, #initialize, #log, #resuming?, #safe_database_url, #safe_remote_url, #safe_url, #server, #session_resource, #set_session, #setup_signal_trap, #skip_schema?, #store_session, #stream_state, #stream_state=, #table_filter, #verify_server

Constructor Details

This class inherits a constructor from Taps::Operation

Instance Method Details

#fetch_remote_tables_infoObject



338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/taps/operation.rb', line 338

def fetch_remote_tables_info
  retries = 0
  max_retries = 10
  begin
    tables = ::OkJson.decode(session_resource['pull/table_names'].get(http_headers).to_s)
  rescue RestClient::Exception
    retries += 1
    retry if retries <= max_retries
    puts "Unable to fetch tables information from #{remote_url}. Please check the server log."
    exit(1)
  end

  data = {}
  apply_table_filter(tables).each do |table_name|
    retries = 0
    begin
      count = Integer(session_resource['pull/table_count'].post({ table: table_name }, http_headers).to_s)
      data[table_name] = count
    rescue RestClient::Exception
      retries += 1
      retry if retries <= max_retries
      puts "Unable to fetch tables information from #{remote_url}. Please check the server log."
      exit(1)
    end
  end
  data
end

#file_prefixObject



234
235
236
# File 'lib/taps/operation.rb', line 234

def file_prefix
  'pull'
end

#pull_dataObject



271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/taps/operation.rb', line 271

def pull_data
  puts 'Receiving data'

  puts "#{tables.size} tables, #{format_number(record_count)} records"

  tables.each do |table_name, count|
    progress = ProgressBar.new(table_name.to_s, count)
    stream = Taps::DataStream.factory(db, chunksize: default_chunksize,
                                          table_name: table_name)
    pull_data_from_table(stream, progress)
  end
end

#pull_data_from_table(stream, progress) ⇒ Object



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# File 'lib/taps/operation.rb', line 296

def pull_data_from_table(stream, progress)
  loop do
    begin
      if exiting?
        store_session
        exit 0
      end

      size = stream.fetch_remote(session_resource['pull/table'], http_headers)
      break if stream.complete?
      progress.inc(size) unless exiting?
      stream.error = false
      self.stream_state = stream.to_hash
    rescue Taps::CorruptedData => e
      puts "Corrupted Data Received #{e.message}, retrying..."
      stream.error = true
      next
    end
  end

  progress.finish
  completed_tables << stream.table_name.to_s
  self.stream_state = {}
end

#pull_indexesObject



366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# File 'lib/taps/operation.rb', line 366

def pull_indexes
  puts 'Receiving indexes'

  idxs = ::OkJson.decode(session_resource['pull/indexes'].get(http_headers).to_s)

  apply_table_filter(idxs).each do |table, indexes|
    next if indexes.empty?
    progress = ProgressBar.new(table, indexes.size)
    indexes.each do |idx|
      output = Taps::Utils.load_indexes(database_url, idx)
      output = output.to_s.strip
      puts output unless output.empty?
      progress.inc(1)
    end
    progress.finish
  end
end

#pull_partial_dataObject



284
285
286
287
288
289
290
291
292
293
294
# File 'lib/taps/operation.rb', line 284

def pull_partial_data
  return if stream_state == {}

  table_name = stream_state[:table_name]
  record_count = tables[table_name.to_s]
  puts "Resuming #{table_name}, #{format_number(record_count)} records"

  progress = ProgressBar.new(table_name.to_s, record_count)
  stream = Taps::DataStream.factory(db, stream_state)
  pull_data_from_table(stream, progress)
end

#pull_reset_sequencesObject



384
385
386
387
388
389
390
# File 'lib/taps/operation.rb', line 384

def pull_reset_sequences
  puts 'Resetting sequences'

  output = Taps::Utils.schema_bin(:reset_db_sequences, database_url)
  output = output.to_s.strip
  puts output unless output.empty?
end

#pull_schemaObject



256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/taps/operation.rb', line 256

def pull_schema
  puts 'Receiving schema'

  progress = ProgressBar.new('Schema', tables.size)
  tables.each do |table_name, _count|
    schema_data = session_resource['pull/schema'].post({ table_name: table_name }, http_headers).to_s
    log.debug "Table: #{table_name}\n#{schema_data}\n"
    output = Taps::Utils.load_schema(database_url, schema_data)
    output = output.to_s.strip
    puts output unless output.empty?
    progress.inc(1)
  end
  progress.finish
end

#record_countObject



330
331
332
# File 'lib/taps/operation.rb', line 330

def record_count
  @record_count ||= remote_tables_info.values.inject(0) { |a, c| a += c }
end

#remote_tables_infoObject



334
335
336
# File 'lib/taps/operation.rb', line 334

def remote_tables_info
  opts[:remote_tables_info] ||= fetch_remote_tables_info
end

#runObject



242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/taps/operation.rb', line 242

def run
  catch_errors do
    unless resuming?
      pull_schema unless skip_schema?
      pull_indexes if indexes_first? && !skip_schema?
    end
    setup_signal_trap
    pull_partial_data if resuming?
    pull_data
    pull_indexes if !indexes_first? && !skip_schema?
    pull_reset_sequences
  end
end

#tablesObject



321
322
323
324
325
326
327
328
# File 'lib/taps/operation.rb', line 321

def tables
  h = {}
  remote_tables_info.each do |table_name, count|
    next if completed_tables.include?(table_name.to_s)
    h[table_name.to_s] = count
  end
  h
end

#to_hashObject



238
239
240
# File 'lib/taps/operation.rb', line 238

def to_hash
  super.merge(remote_tables_info: remote_tables_info)
end