Class: Backfiller::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/backfiller/runner.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(task_name) ⇒ Runner

Returns a new instance of Runner.



12
13
14
15
16
17
18
# File 'lib/backfiller/runner.rb', line 12

def initialize(task_name)
  @task = build_task(task_name)
  @connection_pool = @task.respond_to?(:connection_pool) ? @task.connection_pool : default_connection_pool
  @batch_size = @task.respond_to?(:batch_size) ? @task.batch_size : Backfiller.batch_size
  @cursor_threshold = @task.respond_to?(:cursor_threshold) ? @task.cursor_threshold : Backfiller.cursor_threshold
  @process_method = @task.respond_to?(:process_row) ? @task.method(:process_row) : method(:process_row)
end

Instance Attribute Details

#batch_sizeObject (readonly)

Returns the value of attribute batch_size.



5
6
7
# File 'lib/backfiller/runner.rb', line 5

def batch_size
  @batch_size
end

#connection_poolObject (readonly)

Returns the value of attribute connection_pool.



5
6
7
# File 'lib/backfiller/runner.rb', line 5

def connection_pool
  @connection_pool
end

#cursor_thresholdObject (readonly)

Returns the value of attribute cursor_threshold.



5
6
7
# File 'lib/backfiller/runner.rb', line 5

def cursor_threshold
  @cursor_threshold
end

#process_methodObject (readonly)

Returns the value of attribute process_method.



5
6
7
# File 'lib/backfiller/runner.rb', line 5

def process_method
  @process_method
end

#taskObject (readonly)

Returns the value of attribute task.



5
6
7
# File 'lib/backfiller/runner.rb', line 5

def task
  @task
end

Instance Method Details

#runObject

It uses two connections from pool:

  • master [M] - reads data using cursor in transaction

  • worker [W] - changes data based on record red from master

Examples:

[M] BEGIN
[M] DECLARE backfill_cursor SCROLL CURSOR WITHOUT HOLD FOR SELECT * FROM users
// Start fetch and process loop:
[M] FETCH 1000 backfill_cursor
  [W] UPDATE users SET full_name = '...' where id = 1
  [W] ...
  [W] UPDATE users SET full_name = '...' where id = 1000
[M] FETCH 1000 backfill_cursor
  [W] UPDATE users SET full_name = '...' where id = 1001
  [W] ...
  [W] UPDATE users SET full_name = '...' where id = 2000
// Records per cursor transaction threshold reached. Reopen transaction.
[M] CLOSE backfill_cursor
[M] COMMIT
[M] BEGIN
[M] DECLARE backfill_cursor SCROLL CURSOR WITHOUT HOLD FOR SELECT * FROM users
[M] FETCH 1000 backfill_cursor
// The end of cursor reached. Break cursor loop and exit.
[M] CLOSE backfill_cursor
[M] COMMIT


45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/backfiller/runner.rb', line 45

def run
  master_connection = acquire_connection
  worker_connection = acquire_connection

  begin
    run_cursor_loop(master_connection) do |row|
      process_method.call(worker_connection, row)
    end
  ensure
    release_connection(master_connection)
    release_connection(worker_connection)
  end
end