Class: JobIteration::EnumeratorBuilder

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/job-iteration/enumerator_builder.rb

Defined Under Namespace

Classes: Wrapper

Instance Method Summary collapse

Constructor Details

#initialize(job, wrapper: Wrapper) ⇒ EnumeratorBuilder

Returns a new instance of EnumeratorBuilder.



33
34
35
36
# File 'lib/job-iteration/enumerator_builder.rb', line 33

def initialize(job, wrapper: Wrapper)
  @job = job
  @wrapper = wrapper
end

Instance Method Details

#build_active_record_enumerator_on_batch_relations(scope, wrap: true, cursor:, **args) ⇒ Object Also known as: active_record_on_batch_relations

Builds Enumerator from Active Record Relation and enumerates on batches, yielding Active Record Relations. See documentation for #build_active_record_enumerator_on_batches.



123
124
125
126
127
128
129
130
131
# File 'lib/job-iteration/enumerator_builder.rb', line 123

def build_active_record_enumerator_on_batch_relations(scope, wrap: true, cursor:, **args)
  enum = JobIteration::ActiveRecordBatchEnumerator.new(
    scope,
    cursor: cursor,
    **args,
  ).each
  enum = wrap(self, enum) if wrap
  enum
end

#build_active_record_enumerator_on_batches(scope, cursor:, **args) ⇒ Object Also known as: active_record_on_batches

Builds Enumerator from Active Record Relation and enumerates on batches of records. Each Enumerator tick moves the cursor +batch_size+ rows forward.

+batch_size:+ sets how many records will be fetched in one batch. Defaults to 100.

For the rest of arguments, see documentation for #build_active_record_enumerator_on_records



112
113
114
115
116
117
118
119
# File 'lib/job-iteration/enumerator_builder.rb', line 112

def build_active_record_enumerator_on_batches(scope, cursor:, **args)
  enum = build_active_record_enumerator(
    scope,
    cursor: cursor,
    **args,
  ).batches
  wrap(self, enum)
end

#build_active_record_enumerator_on_records(scope, cursor:, **args) ⇒ Object Also known as: active_record_on_records

Builds Enumerator from Active Record Relation. Each Enumerator tick moves the cursor one row forward.

+columns:+ argument is used to build the actual query for iteration. +columns+: defaults to primary key:

1) SELECT * FROM users ORDER BY id LIMIT 100

When iteration is resumed, +cursor:+ and +columns:+ values will be used to continue from the point where iteration stopped:

2) SELECT * FROM users WHERE id > $CURSOR ORDER BY id LIMIT 100

+columns:+ can also take more than one column. In that case, +cursor+ will contain serialized values of all columns at the point where iteration stopped.

Consider this example with +columns: [:created_at, :id]+. Here's the query will use on the first iteration:

1) SELECT * FROM products ORDER BY created_at, id LIMIT 100

And the query on the next iteration:

2) SELECT * FROM products WHERE (created_at > '$LAST_CREATED_AT_CURSOR' OR (created_at = '$LAST_CREATED_AT_CURSOR' AND (id > '$LAST_ID_CURSOR'))) ORDER BY created_at, id LIMIT 100

As a result of this query pattern, if the values in these columns change for the records in scope during iteration, they may be skipped or yielded multiple times depending on the nature of the update and the cursor's value. If the value gets updated to a greater value than the cursor's value, it will get yielded again. Similarly, if the value gets updated to a lesser value than the curor's value, it will get skipped.



97
98
99
100
101
102
103
104
# File 'lib/job-iteration/enumerator_builder.rb', line 97

def build_active_record_enumerator_on_records(scope, cursor:, **args)
  enum = build_active_record_enumerator(
    scope,
    cursor: cursor,
    **args,
  ).records
  wrap(self, enum)
end

#build_array_enumerator(enumerable, cursor:) ⇒ Object Also known as: array

Builds Enumerator object from a given array, using +cursor+ as an offset.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/job-iteration/enumerator_builder.rb', line 53

def build_array_enumerator(enumerable, cursor:)
  unless enumerable.is_a?(Array)
    raise ArgumentError, "enumerable must be an Array"
  end

  drop =
    if cursor.nil?
      0
    else
      cursor + 1
    end

  wrap(self, enumerable.each_with_index.drop(drop).to_enum { enumerable.size - drop })
end

#build_csv_enumerator(enumerable, cursor:) ⇒ Object Also known as: csv



143
144
145
146
# File 'lib/job-iteration/enumerator_builder.rb', line 143

def build_csv_enumerator(enumerable, cursor:)
  enum = CsvEnumerator.new(enumerable).rows(cursor: cursor)
  wrap(self, enum)
end

#build_csv_enumerator_on_batches(enumerable, cursor:, batch_size: 100) ⇒ Object Also known as: csv_on_batches



148
149
150
151
# File 'lib/job-iteration/enumerator_builder.rb', line 148

def build_csv_enumerator_on_batches(enumerable, cursor:, batch_size: 100)
  enum = CsvEnumerator.new(enumerable).batches(cursor: cursor, batch_size: batch_size)
  wrap(self, enum)
end

#build_nested_enumerator(enums, cursor:) ⇒ Object Also known as: nested

Builds Enumerator for nested iteration.

Examples:

def build_enumerator(cursor:)
  enumerator_builder.nested(
    [
      ->(cursor) {
        enumerator_builder.active_record_on_records(Shop.all, cursor: cursor)
      },
      ->(shop, cursor) {
        enumerator_builder.active_record_on_records(shop.products, cursor: cursor)
      },
      ->(_shop, product, cursor) {
        enumerator_builder.active_record_on_batch_relations(product.product_variants, cursor: cursor)
      }
    ],
    cursor: cursor
  )
end

def each_iteration(product_variants_relation)
  # do something
end

Parameters:

  • enums (Array<Proc>)

    an Array of Procs, each should return an Enumerator. Each proc from enums should accept the yielded items from the parent enumerators and the cursor as its arguments. Each proc's cursor argument is its part from the build_enumerator's cursor array.

  • cursor (Array<Object>)

    array of offsets for each of the enums to start iteration from



183
184
185
186
# File 'lib/job-iteration/enumerator_builder.rb', line 183

def build_nested_enumerator(enums, cursor:)
  enum = NestedEnumerator.new(enums, cursor: cursor).each
  wrap(self, enum)
end

#build_once_enumerator(cursor:) ⇒ Object Also known as: once

Builds Enumerator objects that iterates once.



41
42
43
# File 'lib/job-iteration/enumerator_builder.rb', line 41

def build_once_enumerator(cursor:)
  wrap(self, build_times_enumerator(1, cursor: cursor))
end

#build_throttle_enumerator(enumerable, throttle_on:, backoff:) ⇒ Object Also known as: throttle



133
134
135
136
137
138
139
140
141
# File 'lib/job-iteration/enumerator_builder.rb', line 133

def build_throttle_enumerator(enumerable, throttle_on:, backoff:)
  enum = JobIteration::ThrottleEnumerator.new(
    enumerable,
    @job,
    throttle_on: throttle_on,
    backoff: backoff,
  ).to_enum
  wrap(self, enum)
end

#build_times_enumerator(number, cursor:) ⇒ Object Also known as: times

Builds Enumerator objects that iterates N times and yields number starting from zero.

Raises:

  • (ArgumentError)


46
47
48
49
50
# File 'lib/job-iteration/enumerator_builder.rb', line 46

def build_times_enumerator(number, cursor:)
  raise ArgumentError, "First argument must be an Integer" unless number.is_a?(Integer)

  wrap(self, build_array_enumerator(number.times.to_a, cursor: cursor))
end