Class: PluckInBatches::Iterator

Inherits:
Object
  • Object
show all
Defined in:
lib/pluck_in_batches/iterator.rb

Overview

:nodoc:

Constant Summary collapse

VALID_ORDERS =
[:asc, :desc].freeze
DEFAULT_ORDER =
:asc

Instance Method Summary collapse

Constructor Details

#initialize(relation) ⇒ Iterator

Returns a new instance of Iterator.



8
9
10
11
# File 'lib/pluck_in_batches/iterator.rb', line 8

def initialize(relation)
  @relation = relation
  @klass = relation.klass
end

Instance Method Details

#each(*columns, start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, cursor_column: @relation.primary_key, order: DEFAULT_ORDER, &block) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/pluck_in_batches/iterator.rb', line 13

def each(*columns, start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, cursor_column: @relation.primary_key, order: DEFAULT_ORDER, &block)
  if columns.empty?
    raise ArgumentError, "Call `pluck_each' with at least one column."
  end

  if block_given?
    each_batch(*columns, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, cursor_column: cursor_column, order: order) do |batch|
      batch.each(&block)
    end
  else
    enum_for(__callee__, *columns, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, cursor_column: cursor_column, order: order) do
      apply_limits(@relation, start, finish, build_batch_orders(order)).size
    end
  end
end

#each_batch(*columns, start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, cursor_column: @relation.primary_key, order: DEFAULT_ORDER) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/pluck_in_batches/iterator.rb', line 29

def each_batch(*columns, start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, cursor_column: @relation.primary_key, order: DEFAULT_ORDER)
  if columns.empty?
    raise ArgumentError, "Call `pluck_in_batches' with at least one column."
  end

  unless Array(order).all? { |ord| VALID_ORDERS.include?(ord) }
    raise ArgumentError, ":order must be :asc or :desc or an array consisting of :asc or :desc, got #{order.inspect}"
  end

  pluck_columns = columns.map do |column|
    if Arel.arel_node?(column)
      column
    else
      column.to_s
    end
  end

  cursor_columns = Array(cursor_column).map(&:to_s)
  cursor_column_indexes = cursor_column_indexes(pluck_columns, cursor_columns)
  missing_cursor_columns = cursor_column_indexes.count(&:nil?)
  cursor_column_indexes.each_with_index do |column_index, index|
    unless column_index
      cursor_column_indexes[index] = pluck_columns.size
      pluck_columns << cursor_columns[index]
    end
  end

  relation = @relation
  batch_orders = build_batch_orders(cursor_columns, order)

  unless block_given?
    return to_enum(__callee__, *columns, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, cursor_column: cursor_column, order: order) do
      total = apply_limits(relation, cursor_columns, start, finish, batch_orders).size
      (total - 1).div(batch_size) + 1
    end
  end

  if relation.arel.orders.present?
    act_on_ignored_order(error_on_ignore)
  end

  batch_limit = batch_size
  if relation.limit_value
    remaining   = relation.limit_value
    batch_limit = remaining if remaining < batch_limit
  end

  relation = relation.reorder(batch_orders.to_h).limit(batch_limit)
  relation = apply_limits(relation, cursor_columns, start, finish, batch_orders)
  relation.skip_query_cache! # Retaining the results in the query cache would undermine the point of batching
  batch_relation = relation

  loop do
    batch = batch_relation.pluck(*pluck_columns)
    break if batch.empty?

    cursor_column_offsets =
      if pluck_columns.size == 1
        Array(batch.last)
      else
        cursor_column_indexes.map.with_index do |column_index, index|
          batch.last[column_index || (batch.last.size - cursor_column_indexes.size + index)]
        end
      end

    missing_cursor_columns.times { batch.each(&:pop) }
    batch.flatten!(1) if columns.size == 1

    yield batch

    break if batch.length < batch_limit

    if @relation.limit_value
      remaining -= batch.length

      if remaining == 0
        # Saves a useless iteration when the limit is a multiple of the
        # batch size.
        break
      elsif remaining < batch_limit
        relation = relation.limit(remaining)
      end
    end

    _last_column, last_order = batch_orders.last
    operators = batch_orders.map do |_column, order| # rubocop:disable Lint/ShadowingOuterLocalVariable
      order == :desc ? :lteq : :gteq
    end
    operators[-1] = (last_order == :desc ? :lt : :gt)

    batch_relation = batch_condition(relation, cursor_columns, cursor_column_offsets, operators)
  end
end