Module: Sidekiq::Job::Iterable
- Includes:
- Enumerators
- Defined in:
- lib/sidekiq/job/iterable.rb,
lib/sidekiq/job/iterable/enumerators.rb,
lib/sidekiq/job/iterable/csv_enumerator.rb,
lib/sidekiq/job/iterable/active_record_enumerator.rb
Defined Under Namespace
Modules: ClassMethods, Enumerators Classes: ActiveRecordEnumerator, CsvEnumerator
Constant Summary collapse
- CANCELLATION_PERIOD =
Three days is the longest period you generally need to wait for a retry to execute when using the default retry scheme. We don’t want to “forget” the job is cancelled before it has a chance to execute and cancel itself.
(3 * 86_400).to_s
Class Method Summary collapse
- .included(base) ⇒ Object private
Instance Method Summary collapse
- #arguments ⇒ Object
-
#around_iteration ⇒ Object
A hook to override that will be called around each iteration.
-
#build_enumerator ⇒ Enumerator
The enumerator to be iterated over.
-
#cancel! ⇒ Object
Set a flag in Redis to mark this job as cancelled.
- #cancelled? ⇒ Boolean
-
#each_iteration ⇒ void
The action to be performed on each item from the enumerator.
- #initialize ⇒ Object private
- #iteration_key ⇒ Object
-
#on_complete ⇒ Object
A hook to override that will be called when the job finished iterating.
-
#on_resume ⇒ Object
A hook to override that will be called when the job resumes iterating.
-
#on_start ⇒ Object
A hook to override that will be called when the job starts iterating.
-
#on_stop ⇒ Object
A hook to override that will be called each time the job is interrupted.
- #perform(*args) ⇒ Object private
Methods included from Enumerators
#active_record_batches_enumerator, #active_record_records_enumerator, #active_record_relations_enumerator, #array_enumerator, #csv_batches_enumerator, #csv_enumerator
Class Method Details
.included(base) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
13 14 15 |
# File 'lib/sidekiq/job/iterable.rb', line 13 def self.included(base) base.extend(ClassMethods) end |
Instance Method Details
#arguments ⇒ Object
37 38 39 |
# File 'lib/sidekiq/job/iterable.rb', line 37 def arguments @_args end |
#around_iteration ⇒ Object
A hook to override that will be called around each iteration.
Can be useful for some metrics collection, performance tracking etc.
80 81 82 |
# File 'lib/sidekiq/job/iterable.rb', line 80 def around_iteration yield end |
#build_enumerator ⇒ Enumerator
The enumerator to be iterated over.
108 109 110 |
# File 'lib/sidekiq/job/iterable.rb', line 108 def build_enumerator(*) raise NotImplementedError, "#{self.class.name} must implement a '#build_enumerator' method" end |
#cancel! ⇒ Object
Set a flag in Redis to mark this job as cancelled. Cancellation is asynchronous and is checked at the start of iteration and every 5 seconds thereafter as part of the recurring state flush.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/sidekiq/job/iterable.rb', line 49 def cancel! return @_cancelled if cancelled? key = "it-#{jid}" _, result, _ = Sidekiq.redis do |c| c.pipelined do |p| p.hsetnx(key, "cancelled", Time.now.to_i) p.hget(key, "cancelled") # TODO When Redis 7.2 is required # p.expire(key, Sidekiq::Job::Iterable::STATE_TTL, "nx") p.expire(key, Sidekiq::Job::Iterable::STATE_TTL) end end @_cancelled = result.to_i end |
#cancelled? ⇒ Boolean
65 66 67 |
# File 'lib/sidekiq/job/iterable.rb', line 65 def cancelled? @_cancelled end |
#each_iteration ⇒ void
This method returns an undefined value.
The action to be performed on each item from the enumerator.
119 120 121 |
# File 'lib/sidekiq/job/iterable.rb', line 119 def each_iteration(*) raise NotImplementedError, "#{self.class.name} must implement an '#each_iteration' method" end |
#initialize ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
26 27 28 29 30 31 32 33 34 35 |
# File 'lib/sidekiq/job/iterable.rb', line 26 def initialize super @_executions = 0 @_cursor = nil @_start_time = nil @_runtime = 0 @_args = nil @_cancelled = nil end |
#iteration_key ⇒ Object
123 124 125 |
# File 'lib/sidekiq/job/iterable.rb', line 123 def iteration_key "it-#{jid}" end |
#on_complete ⇒ Object
A hook to override that will be called when the job finished iterating.
98 99 |
# File 'lib/sidekiq/job/iterable.rb', line 98 def on_complete end |
#on_resume ⇒ Object
A hook to override that will be called when the job resumes iterating.
86 87 |
# File 'lib/sidekiq/job/iterable.rb', line 86 def on_resume end |
#on_start ⇒ Object
A hook to override that will be called when the job starts iterating.
It is called only once, for the first time.
73 74 |
# File 'lib/sidekiq/job/iterable.rb', line 73 def on_start end |
#on_stop ⇒ Object
A hook to override that will be called each time the job is interrupted.
This can be due to interruption or sidekiq stopping.
93 94 |
# File 'lib/sidekiq/job/iterable.rb', line 93 def on_stop end |
#perform(*args) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/sidekiq/job/iterable.rb', line 128 def perform(*args) @_args = args.dup.freeze fetch_previous_iteration_state @_executions += 1 @_start_time = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) enumerator = build_enumerator(*args, cursor: @_cursor) unless enumerator logger.info("'#build_enumerator' returned nil, skipping the job.") return end assert_enumerator!(enumerator) if @_executions == 1 on_start else on_resume end completed = catch(:abort) do iterate_with_enumerator(enumerator, args) end on_stop completed = handle_completed(completed) if completed on_complete cleanup else reenqueue_iteration_job end end |