Method: Taps::Push#push_data_from_table

Defined in:
lib/taps/operation.rb

#push_data_from_table(stream, progress) ⇒ Object



480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
# File 'lib/taps/operation.rb', line 480

def push_data_from_table(stream, progress)
  loop do
    if exiting?
      store_session
      exit 0
    end

    row_size = 0
    chunksize = stream.state[:chunksize]

    begin
      chunksize = Taps::Utils.calculate_chunksize(chunksize) do |c|
        stream.state[:chunksize] = c.to_i
        encoded_data, row_size, elapsed_time = nil
        d1 = c.time_delta do
          encoded_data, row_size, elapsed_time = stream.fetch
        end
        break if stream.complete?

        data = nil
        d2 = c.time_delta do
          data = {
            :state => stream.to_hash,
            :checksum => Taps::Utils.checksum(encoded_data).to_s
          }
        end

        begin
          content, content_type = nil
          d3 = c.time_delta do
            content, content_type = Taps::Multipart.create do |r|
              r.attach :name => :encoded_data,
                :payload => encoded_data,
                :content_type => 'application/octet-stream'
              r.attach :name => :json,
                :payload => OkJson.encode(data),
                :content_type => 'application/json'
            end
          end
          session_resource['push/table'].post(content, http_headers(:content_type => content_type))
          self.stream_state = stream.to_hash
        rescue => e
          Taps::Utils.reraise_server_exception(e)
        end

        c.idle_secs = (d1 + d2 + d3)

        elapsed_time
      end
    rescue Taps::CorruptedData => e
      # retry the same data, it got corrupted somehow.
      next
    rescue Taps::DuplicatePrimaryKeyError => e
      # verify the stream and retry it
      stream = stream.verify_remote_stream(session_resource['push/verify_stream'], http_headers)
      next
    end
    stream.state[:chunksize] = chunksize

    progress.inc(row_size)

    stream.increment(row_size)
    break if stream.complete?
  end

  progress.finish
  completed_tables << stream.table_name.to_s
  self.stream_state = {}
end