Class: Taps::Pull
Instance Attribute Summary
Attributes inherited from Operation
#database_url, #opts, #remote_url, #session_uri
Instance Method Summary
collapse
Methods inherited from Operation
#apply_table_filter, #close_session, #completed_tables, #compression_disabled?, #db, #default_chunksize, #exiting?, factory, #format_number, #http_headers, #indexes_first?, #initialize, #log, #resuming?, #safe_database_url, #safe_remote_url, #safe_url, #server, #session_resource, #set_session, #setup_signal_trap, #store_session, #stream_state, #stream_state=, #table_filter, #verify_server
Instance Method Details
#fetch_remote_tables_info ⇒ Object
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
|
# File 'lib/taps/operation.rb', line 321
def fetch_remote_tables_info
retries = 0
max_retries = 10
begin
tables = JSON.load(session_resource['pull/table_names'].get().to_s)
rescue RestClient::Exception
retries += 1
retry if retries <= max_retries
puts "Unable to fetch tables information from #{remote_url}. Please check the server log."
exit(1)
end
data = {}
apply_table_filter(tables).each do |table_name|
retries = 0
begin
count = session_resource['pull/table_count'].post({:table => table_name}, ).to_s.to_i
data[table_name] = count
rescue RestClient::Exception
retries += 1
retry if retries <= max_retries
puts "Unable to fetch tables information from #{remote_url}. Please check the server log."
exit(1)
end
end
data
end
|
#file_prefix ⇒ Object
203
204
205
|
# File 'lib/taps/operation.rb', line 203
def file_prefix
"pull"
end
|
#pull_data ⇒ Object
252
253
254
255
256
257
258
259
260
261
262
263
264
265
|
# File 'lib/taps/operation.rb', line 252
def pull_data
puts "Receiving data"
puts "#{tables.size} tables, #{format_number(record_count)} records"
tables.each do |table_name, count|
progress = ProgressBar.new(table_name.to_s, count)
stream = Taps::DataStream.factory(db, {
:chunksize => default_chunksize,
:table_name => table_name
})
pull_data_from_table(stream, progress)
end
end
|
#pull_data_from_table(stream, progress) ⇒ Object
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
|
# File 'lib/taps/operation.rb', line 279
def pull_data_from_table(stream, progress)
loop do
begin
if exiting?
store_session
exit 0
end
size = stream.fetch_remote(session_resource['pull/table'], )
break if stream.complete?
progress.inc(size) unless exiting?
stream.error = false
self.stream_state = stream.to_hash
rescue DataStream::CorruptedData => e
puts "Corrupted Data Received #{e.message}, retrying..."
stream.error = true
next
end
end
progress.finish
completed_tables << stream.table_name.to_s
self.stream_state = {}
end
|
#pull_indexes ⇒ Object
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
|
# File 'lib/taps/operation.rb', line 349
def pull_indexes
puts "Receiving indexes"
idxs = JSON.parse(session_resource['pull/indexes'].get().to_s)
apply_table_filter(idxs).each do |table, indexes|
next unless indexes.size > 0
progress = ProgressBar.new(table, indexes.size)
indexes.each do |idx|
output = Taps::Utils.load_indexes(database_url, idx)
puts output if output
progress.inc(1)
end
progress.finish
end
end
|
#pull_partial_data ⇒ Object
267
268
269
270
271
272
273
274
275
276
277
|
# File 'lib/taps/operation.rb', line 267
def pull_partial_data
return if stream_state == {}
table_name = stream_state[:table_name]
record_count = tables[table_name.to_s]
puts "Resuming #{table_name}, #{format_number(record_count)} records"
progress = ProgressBar.new(table_name.to_s, record_count)
stream = Taps::DataStream.factory(db, stream_state)
pull_data_from_table(stream, progress)
end
|
#pull_reset_sequences ⇒ Object
366
367
368
369
370
371
|
# File 'lib/taps/operation.rb', line 366
def pull_reset_sequences
puts "Resetting sequences"
output = Taps::Utils.schema_bin(:reset_db_sequences, database_url)
puts output if output
end
|
#pull_schema ⇒ Object
238
239
240
241
242
243
244
245
246
247
248
249
250
|
# File 'lib/taps/operation.rb', line 238
def pull_schema
puts "Receiving schema"
progress = ProgressBar.new('Schema', tables.size)
tables.each do |table_name, count|
schema_data = session_resource['pull/schema'].post({:table_name => table_name}, ).to_s
log.debug "Table: #{table_name}\n#{schema_data}\n"
output = Taps::Utils.load_schema(database_url, schema_data)
puts output if output
progress.inc(1)
end
progress.finish
end
|
#record_count ⇒ Object
313
314
315
|
# File 'lib/taps/operation.rb', line 313
def record_count
@record_count ||= remote_tables_info.values.inject(0) { |a,c| a += c }
end
|
#remote_tables_info ⇒ Object
317
318
319
|
# File 'lib/taps/operation.rb', line 317
def remote_tables_info
opts[:remote_tables_info] ||= fetch_remote_tables_info
end
|
#run ⇒ Object
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
|
# File 'lib/taps/operation.rb', line 211
def run
verify_server
begin
unless resuming?
pull_schema
pull_indexes if indexes_first?
end
setup_signal_trap
pull_partial_data if resuming?
pull_data
pull_indexes unless indexes_first?
pull_reset_sequences
close_session
rescue RestClient::Exception => e
store_session
if e.respond_to?(:response)
puts "!!! Caught Server Exception"
puts "HTTP CODE: #{e.http_code}"
puts "#{e.response.to_s}"
exit(1)
else
raise
end
end
end
|
#tables ⇒ Object
304
305
306
307
308
309
310
311
|
# File 'lib/taps/operation.rb', line 304
def tables
h = {}
remote_tables_info.each do |table_name, count|
next if completed_tables.include?(table_name.to_s)
h[table_name.to_s] = count
end
h
end
|
#to_hash ⇒ Object
207
208
209
|
# File 'lib/taps/operation.rb', line 207
def to_hash
super.merge(:remote_tables_info => remote_tables_info)
end
|