Module: Sidekiq::Job::Iterable

Includes:
Enumerators
Defined in:
lib/sidekiq/job/iterable.rb,
lib/sidekiq/job/iterable/enumerators.rb,
lib/sidekiq/job/iterable/csv_enumerator.rb,
lib/sidekiq/job/iterable/active_record_enumerator.rb

Defined Under Namespace

Modules: ClassMethods, Enumerators Classes: ActiveRecordEnumerator, CsvEnumerator

Constant Summary collapse

CANCELLATION_PERIOD =

Three days is the longest period you generally need to wait for a retry to execute when using the default retry scheme. We don’t want to “forget” the job is cancelled before it has a chance to execute and cancel itself.

(3 * 86_400).to_s

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Enumerators

#active_record_batches_enumerator, #active_record_records_enumerator, #active_record_relations_enumerator, #array_enumerator, #csv_batches_enumerator, #csv_enumerator

Class Method Details

.included(base) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



13
14
15
# File 'lib/sidekiq/job/iterable.rb', line 13

def self.included(base)
  base.extend(ClassMethods)
end

Instance Method Details

#argumentsObject



37
38
39
# File 'lib/sidekiq/job/iterable.rb', line 37

def arguments
  @_args
end

#around_iterationObject

A hook to override that will be called around each iteration.

Can be useful for some metrics collection, performance tracking etc.



80
81
82
# File 'lib/sidekiq/job/iterable.rb', line 80

def around_iteration
  yield
end

#build_enumeratorEnumerator

The enumerator to be iterated over.

Returns:

  • (Enumerator)

Raises:

  • (NotImplementedError)

    with a message advising subclasses to implement an override for this method.



108
109
110
# File 'lib/sidekiq/job/iterable.rb', line 108

def build_enumerator(*)
  raise NotImplementedError, "#{self.class.name} must implement a '#build_enumerator' method"
end

#cancel!Object

Set a flag in Redis to mark this job as cancelled. Cancellation is asynchronous and is checked at the start of iteration and every 5 seconds thereafter as part of the recurring state flush.



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/sidekiq/job/iterable.rb', line 49

def cancel!
  return @_cancelled if cancelled?

  key = "it-#{jid}"
  _, result, _ = Sidekiq.redis do |c|
    c.pipelined do |p|
      p.hsetnx(key, "cancelled", Time.now.to_i)
      p.hget(key, "cancelled")
      # TODO When Redis 7.2 is required
      # p.expire(key, Sidekiq::Job::Iterable::STATE_TTL, "nx")
      p.expire(key, Sidekiq::Job::Iterable::STATE_TTL)
    end
  end
  @_cancelled = result.to_i
end

#cancelled?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/sidekiq/job/iterable.rb', line 65

def cancelled?
  @_cancelled
end

#each_iterationvoid

This method returns an undefined value.

The action to be performed on each item from the enumerator.

Raises:

  • (NotImplementedError)

    with a message advising subclasses to implement an override for this method.



119
120
121
# File 'lib/sidekiq/job/iterable.rb', line 119

def each_iteration(*)
  raise NotImplementedError, "#{self.class.name} must implement an '#each_iteration' method"
end

#initializeObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



26
27
28
29
30
31
32
33
34
35
# File 'lib/sidekiq/job/iterable.rb', line 26

def initialize
  super

  @_executions = 0
  @_cursor = nil
  @_start_time = nil
  @_runtime = 0
  @_args = nil
  @_cancelled = nil
end

#iteration_keyObject



123
124
125
# File 'lib/sidekiq/job/iterable.rb', line 123

def iteration_key
  "it-#{jid}"
end

#on_completeObject

A hook to override that will be called when the job finished iterating.



98
99
# File 'lib/sidekiq/job/iterable.rb', line 98

def on_complete
end

#on_resumeObject

A hook to override that will be called when the job resumes iterating.



86
87
# File 'lib/sidekiq/job/iterable.rb', line 86

def on_resume
end

#on_startObject

A hook to override that will be called when the job starts iterating.

It is called only once, for the first time.



73
74
# File 'lib/sidekiq/job/iterable.rb', line 73

def on_start
end

#on_stopObject

A hook to override that will be called each time the job is interrupted.

This can be due to interruption or sidekiq stopping.



93
94
# File 'lib/sidekiq/job/iterable.rb', line 93

def on_stop
end

#perform(*args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/sidekiq/job/iterable.rb', line 128

def perform(*args)
  @_args = args.dup.freeze
  fetch_previous_iteration_state

  @_executions += 1
  @_start_time = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)

  enumerator = build_enumerator(*args, cursor: @_cursor)
  unless enumerator
    logger.info("'#build_enumerator' returned nil, skipping the job.")
    return
  end

  assert_enumerator!(enumerator)

  if @_executions == 1
    on_start
  else
    on_resume
  end

  completed = catch(:abort) do
    iterate_with_enumerator(enumerator, args)
  end

  on_stop
  completed = handle_completed(completed)

  if completed
    on_complete
    cleanup
  else
    reenqueue_iteration_job
  end
end