Class: SidekiqIteration::ActiveRecordEnumerator

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq_iteration/active_record_enumerator.rb

Constant Summary collapse

SQL_DATETIME_WITH_NSEC =
"%Y-%m-%d %H:%M:%S.%6N"

Instance Method Summary collapse

Constructor Details

#initialize(relation, columns: nil, batch_size: 100, order: :asc, cursor: nil) ⇒ ActiveRecordEnumerator

Returns a new instance of ActiveRecordEnumerator.

Raises:

  • (ArgumentError)


8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/sidekiq_iteration/active_record_enumerator.rb', line 8

def initialize(relation, columns: nil, batch_size: 100, order: :asc, cursor: nil)
  unless relation.is_a?(ActiveRecord::Relation)
    raise ArgumentError, "relation must be an ActiveRecord::Relation"
  end

  unless order == :asc || order == :desc
    raise ArgumentError, ":order must be :asc or :desc, got #{order.inspect}"
  end

  @primary_key = "#{relation.table_name}.#{relation.primary_key}"
  @columns = Array(columns&.map(&:to_s) || @primary_key)
  @primary_key_index = @columns.index(@primary_key) || @columns.index(relation.primary_key)
  @pluck_columns = if @primary_key_index
                     @columns
                   else
                     @columns + [@primary_key]
                   end
  @batch_size = batch_size
  @order = order
  @cursor = Array.wrap(cursor)
  raise ArgumentError, "Must specify at least one column" if @columns.empty?
  if relation.joins_values.present? && !@columns.all?(/\./)
    raise ArgumentError, "You need to specify fully-qualified columns if you join a table"
  end

  if relation.arel.orders.present? || relation.arel.taken.present?
    raise ArgumentError,
      "The relation cannot use ORDER BY or LIMIT due to the way how iteration with a cursor is designed. " \
      "You can use other ways to limit the number of rows, e.g. a WHERE condition on the primary key column."
  end

  ordering = @columns.to_h { |column| [column, @order] }
  @base_relation = relation.reorder(ordering)
  @iteration_count = 0
end

Instance Method Details

#batchesObject



55
56
57
58
59
60
61
62
# File 'lib/sidekiq_iteration/active_record_enumerator.rb', line 55

def batches
  Enumerator.new(-> { records_size }) do |yielder|
    while (batch = next_batch(load: true))
      @iteration_count += 1
      yielder.yield(batch, cursor_value(batch.last))
    end
  end
end

#recordsObject



44
45
46
47
48
49
50
51
52
53
# File 'lib/sidekiq_iteration/active_record_enumerator.rb', line 44

def records
  Enumerator.new(-> { records_size }) do |yielder|
    batches.each do |batch, _|
      batch.each do |record|
        @iteration_count += 1
        yielder.yield(record, cursor_value(record))
      end
    end
  end
end

#relationsObject



64
65
66
67
68
69
70
71
# File 'lib/sidekiq_iteration/active_record_enumerator.rb', line 64

def relations
  Enumerator.new(-> { relations_size }) do |yielder|
    while (batch = next_batch(load: false))
      @iteration_count += 1
      yielder.yield(batch, unwrap_array(@cursor))
    end
  end
end