Module: Sidekiq::Job::Iterable

Includes:
Enumerators
Defined in:
lib/sidekiq/job/iterable.rb,
lib/sidekiq/job/iterable/enumerators.rb,
lib/sidekiq/job/iterable/csv_enumerator.rb,
lib/sidekiq/job/iterable/active_record_enumerator.rb

Defined Under Namespace

Modules: ClassMethods, Enumerators Classes: ActiveRecordEnumerator, CsvEnumerator

Constant Summary collapse

CANCELLATION_PERIOD =

Three days is the longest period you generally need to wait for a retry to execute when using the default retry scheme. We don’t want to “forget” the job is cancelled before it has a chance to execute and cancel itself.

(3 * 86_400).to_s

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Enumerators

#active_record_batches_enumerator, #active_record_records_enumerator, #active_record_relations_enumerator, #array_enumerator, #csv_batches_enumerator, #csv_enumerator

Instance Attribute Details

#current_objectObject (readonly)

Access to the current object while iterating. This value is not reset so the latest element is explicitly available to cleanup/complete callbacks.



41
42
43
# File 'lib/sidekiq/job/iterable.rb', line 41

def current_object
  @current_object
end

Class Method Details

.included(base) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



13
14
15
# File 'lib/sidekiq/job/iterable.rb', line 13

def self.included(base)
  base.extend(ClassMethods)
end

Instance Method Details

#argumentsObject



43
44
45
# File 'lib/sidekiq/job/iterable.rb', line 43

def arguments
  @_args
end

#around_iterationObject

A hook to override that will be called around each iteration.

Can be useful for some metrics collection, performance tracking etc.



88
89
90
# File 'lib/sidekiq/job/iterable.rb', line 88

def around_iteration
  yield
end

#build_enumeratorEnumerator

The enumerator to be iterated over.

Returns:

  • (Enumerator)

Raises:

  • (NotImplementedError)

    with a message advising subclasses to implement an override for this method.



121
122
123
# File 'lib/sidekiq/job/iterable.rb', line 121

def build_enumerator(*)
  raise NotImplementedError, "#{self.class.name} must implement a '#build_enumerator' method"
end

#cancel!Object

Set a flag in Redis to mark this job as cancelled. Cancellation is asynchronous and is checked at the start of iteration and every 5 seconds thereafter as part of the recurring state flush.



55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/sidekiq/job/iterable.rb', line 55

def cancel!
  return @_cancelled if cancelled?

  key = "it-#{jid}"
  _, result, _ = Sidekiq.redis do |c|
    c.pipelined do |p|
      p.hsetnx(key, "cancelled", Time.now.to_i)
      p.hget(key, "cancelled")
      p.expire(key, Sidekiq::Job::Iterable::STATE_TTL, "nx")
    end
  end
  @_cancelled = result.to_i
end

#cancelled?Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/sidekiq/job/iterable.rb', line 69

def cancelled?
  @_cancelled
end

#cursorObject



73
74
75
# File 'lib/sidekiq/job/iterable.rb', line 73

def cursor
  @_cursor.freeze
end

#each_iterationvoid

This method returns an undefined value.

The action to be performed on each item from the enumerator.

Raises:

  • (NotImplementedError)

    with a message advising subclasses to implement an override for this method.



132
133
134
# File 'lib/sidekiq/job/iterable.rb', line 132

def each_iteration(*)
  raise NotImplementedError, "#{self.class.name} must implement an '#each_iteration' method"
end

#initializeObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/sidekiq/job/iterable.rb', line 26

def initialize
  super

  @_executions = 0
  @_cursor = nil
  @_start_time = nil
  @_runtime = 0
  @_args = nil
  @_cancelled = nil
  @current_object = nil
end

#iteration_keyObject



136
137
138
# File 'lib/sidekiq/job/iterable.rb', line 136

def iteration_key
  "it-#{jid}"
end

#on_cancelObject

A hook to override that will be called when the job is cancelled.



106
107
# File 'lib/sidekiq/job/iterable.rb', line 106

def on_cancel
end

#on_completeObject

A hook to override that will be called when the job finished iterating.



111
112
# File 'lib/sidekiq/job/iterable.rb', line 111

def on_complete
end

#on_resumeObject

A hook to override that will be called when the job resumes iterating.



94
95
# File 'lib/sidekiq/job/iterable.rb', line 94

def on_resume
end

#on_startObject

A hook to override that will be called when the job starts iterating.

It is called only once, for the first time.



81
82
# File 'lib/sidekiq/job/iterable.rb', line 81

def on_start
end

#on_stopObject

A hook to override that will be called each time the job is interrupted.

This can be due to interruption or sidekiq stopping.



101
102
# File 'lib/sidekiq/job/iterable.rb', line 101

def on_stop
end

#perform(*args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/sidekiq/job/iterable.rb', line 141

def perform(*args)
  @_args = args.dup.freeze
  fetch_previous_iteration_state

  @_executions += 1
  @_start_time = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)

  enumerator = build_enumerator(*args, cursor: @_cursor)
  unless enumerator
    logger.info("'#build_enumerator' returned nil, skipping the job.")
    return
  end

  assert_enumerator!(enumerator)

  if @_executions == 1
    on_start
  else
    on_resume
  end

  completed = catch(:abort) do
    iterate_with_enumerator(enumerator, args)
  end

  on_stop
  completed = handle_completed(completed)

  if completed
    on_complete
    cleanup
  else
    reenqueue_iteration_job
  end
end