Class: Lhm::Chunker

Inherits:
Object
  • Object
show all
Includes:
Command, SqlHelper
Defined in:
lib/lhm/chunker.rb

Constant Summary collapse

LOG_PREFIX =
"Chunker"

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from SqlHelper

#annotation, #idx_name, #idx_spec, #tagged, #version_string

Methods included from Command

#run

Constructor Details

#initialize(migration, connection = nil, options = {}) ⇒ Chunker

Copy from origin to destination in chunks of size ‘stride`. Use the `throttler` class to sleep between each stride.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/lhm/chunker.rb', line 20

def initialize(migration, connection = nil, options = {})
  @migration = migration
  @connection = connection
  @chunk_finder = ChunkFinder.new(migration, connection, options)
  @options = options
  @raise_on_warnings = options.fetch(:raise_on_warnings, false)
  @verifier = options[:verifier]
  if @throttler = options[:throttler]
    @throttler.connection = @connection if @throttler.respond_to?(:connection=)
  end
  @start = @chunk_finder.start
  @limit = @chunk_finder.limit
  @printer = options[:printer] || Printer::Percentage.new
  @retry_options = options[:retriable] || {}
  @retry_helper = SqlRetry.new(
    @connection,
    retry_options: @retry_options
  )
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



14
15
16
# File 'lib/lhm/chunker.rb', line 14

def connection
  @connection
end

Instance Method Details

#executeObject



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/lhm/chunker.rb', line 40

def execute
  @start_time = Time.now

  return if @chunk_finder.table_empty?
  @next_to_insert = @start
  while @next_to_insert <= @limit || (@start == @limit)
    stride = @throttler.stride
    top = upper_id(@next_to_insert, stride)
    verify_can_run

    begin
      affected_rows = ChunkInsert.new(@migration, @connection, bottom, top, @retry_options).insert_and_return_count_of_rows_created
    rescue ActiveRecord::StatementInvalid => e
      if e.message.downcase.include?("transaction required more than 'max_binlog_cache_size' bytes of storage") && @throttler.respond_to?(:backoff_stride)
        Lhm.logger.info("Encountered max_binlog_cache_size error, attempting to reduce stride size")
        @throttler.backoff_stride
        next
      else
        raise e
      end
    end

    expected_rows = top - bottom + 1

    # Only log the chunker progress every 5 minutes instead of every iteration
    current_time = Time.now
    if current_time - @start_time > (5 * 60)
      Lhm.logger.info("Inserted #{affected_rows} rows into the destination table from #{bottom} to #{top}")
      @start_time = current_time
    end

    if affected_rows < expected_rows
      raise_on_non_pk_duplicate_warning
    end

    if @throttler && affected_rows > 0
      @throttler.run
    end

    @next_to_insert = top + 1
    @printer.notify(bottom, @limit)

    break if @start == @limit
  end
  @printer.end
rescue => e
  @printer.exception(e) if @printer.respond_to?(:exception)
  raise
end