Class: Lhm::Chunker
- Inherits:
-
Object
- Object
- Lhm::Chunker
- Defined in:
- lib/lhm/chunker.rb
Constant Summary collapse
- LOG_PREFIX =
"Chunker"
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
Instance Method Summary collapse
- #execute ⇒ Object
-
#initialize(migration, connection = nil, options = {}) ⇒ Chunker
constructor
Copy from origin to destination in chunks of size ‘stride`.
Methods included from SqlHelper
#annotation, #idx_name, #idx_spec, #tagged, #version_string
Methods included from Command
Constructor Details
#initialize(migration, connection = nil, options = {}) ⇒ Chunker
Copy from origin to destination in chunks of size ‘stride`. Use the `throttler` class to sleep between each stride.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/lhm/chunker.rb', line 20 def initialize(migration, connection = nil, = {}) @migration = migration @connection = connection @chunk_finder = ChunkFinder.new(migration, connection, ) @options = @raise_on_warnings = .fetch(:raise_on_warnings, false) @verifier = [:verifier] if @throttler = [:throttler] @throttler.connection = @connection if @throttler.respond_to?(:connection=) end @start = @chunk_finder.start @limit = @chunk_finder.limit @printer = [:printer] || Printer::Percentage.new @retry_options = [:retriable] || {} @retry_helper = SqlRetry.new( @connection, retry_options: @retry_options ) end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
14 15 16 |
# File 'lib/lhm/chunker.rb', line 14 def connection @connection end |
Instance Method Details
#execute ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/lhm/chunker.rb', line 40 def execute @start_time = Time.now return if @chunk_finder.table_empty? @next_to_insert = @start while @next_to_insert <= @limit || (@start == @limit) stride = @throttler.stride top = upper_id(@next_to_insert, stride) verify_can_run begin affected_rows = ChunkInsert.new(@migration, @connection, bottom, top, @retry_options).insert_and_return_count_of_rows_created rescue ActiveRecord::StatementInvalid => e if e..downcase.include?("transaction required more than 'max_binlog_cache_size' bytes of storage") && @throttler.respond_to?(:backoff_stride) Lhm.logger.info("Encountered max_binlog_cache_size error, attempting to reduce stride size") @throttler.backoff_stride next else raise e end end expected_rows = top - bottom + 1 # Only log the chunker progress every 5 minutes instead of every iteration current_time = Time.now if current_time - @start_time > (5 * 60) Lhm.logger.info("Inserted #{affected_rows} rows into the destination table from #{bottom} to #{top}") @start_time = current_time end if affected_rows < expected_rows raise_on_non_pk_duplicate_warning end if @throttler && affected_rows > 0 @throttler.run end @next_to_insert = top + 1 @printer.notify(bottom, @limit) break if @start == @limit end @printer.end rescue => e @printer.exception(e) if @printer.respond_to?(:exception) raise end |