Module: Sidekiq::Job::Iterable
- Includes:
- Enumerators
- Defined in:
- lib/sidekiq/job/iterable.rb,
lib/sidekiq/job/iterable/enumerators.rb,
lib/sidekiq/job/iterable/csv_enumerator.rb,
lib/sidekiq/job/iterable/active_record_enumerator.rb
Defined Under Namespace
Modules: ClassMethods, Enumerators Classes: ActiveRecordEnumerator, CsvEnumerator
Constant Summary collapse
- CANCELLATION_PERIOD =
Three days is the longest period you generally need to wait for a retry to execute when using the default retry scheme. We don’t want to “forget” the job is cancelled before it has a chance to execute and cancel itself.
(3 * 86_400).to_s
Instance Attribute Summary collapse
-
#current_object ⇒ Object
readonly
Access to the current object while iterating.
Class Method Summary collapse
- .included(base) ⇒ Object private
Instance Method Summary collapse
- #arguments ⇒ Object
-
#around_iteration ⇒ Object
A hook to override that will be called around each iteration.
-
#build_enumerator ⇒ Enumerator
The enumerator to be iterated over.
-
#cancel! ⇒ Object
Set a flag in Redis to mark this job as cancelled.
- #cancelled? ⇒ Boolean
- #cursor ⇒ Object
-
#each_iteration ⇒ void
The action to be performed on each item from the enumerator.
- #initialize ⇒ Object private
- #iteration_key ⇒ Object
-
#on_cancel ⇒ Object
A hook to override that will be called when the job is cancelled.
-
#on_complete ⇒ Object
A hook to override that will be called when the job finished iterating.
-
#on_resume ⇒ Object
A hook to override that will be called when the job resumes iterating.
-
#on_start ⇒ Object
A hook to override that will be called when the job starts iterating.
-
#on_stop ⇒ Object
A hook to override that will be called each time the job is interrupted.
- #perform(*args) ⇒ Object private
Methods included from Enumerators
#active_record_batches_enumerator, #active_record_records_enumerator, #active_record_relations_enumerator, #array_enumerator, #csv_batches_enumerator, #csv_enumerator
Instance Attribute Details
#current_object ⇒ Object (readonly)
Access to the current object while iterating. This value is not reset so the latest element is explicitly available to cleanup/complete callbacks.
41 42 43 |
# File 'lib/sidekiq/job/iterable.rb', line 41 def current_object @current_object end |
Class Method Details
.included(base) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
13 14 15 |
# File 'lib/sidekiq/job/iterable.rb', line 13 def self.included(base) base.extend(ClassMethods) end |
Instance Method Details
#arguments ⇒ Object
43 44 45 |
# File 'lib/sidekiq/job/iterable.rb', line 43 def arguments @_args end |
#around_iteration ⇒ Object
A hook to override that will be called around each iteration.
Can be useful for some metrics collection, performance tracking etc.
88 89 90 |
# File 'lib/sidekiq/job/iterable.rb', line 88 def around_iteration yield end |
#build_enumerator ⇒ Enumerator
The enumerator to be iterated over.
121 122 123 |
# File 'lib/sidekiq/job/iterable.rb', line 121 def build_enumerator(*) raise NotImplementedError, "#{self.class.name} must implement a '#build_enumerator' method" end |
#cancel! ⇒ Object
Set a flag in Redis to mark this job as cancelled. Cancellation is asynchronous and is checked at the start of iteration and every 5 seconds thereafter as part of the recurring state flush.
55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/sidekiq/job/iterable.rb', line 55 def cancel! return @_cancelled if cancelled? key = "it-#{jid}" _, result, _ = Sidekiq.redis do |c| c.pipelined do |p| p.hsetnx(key, "cancelled", Time.now.to_i) p.hget(key, "cancelled") p.expire(key, Sidekiq::Job::Iterable::STATE_TTL, "nx") end end @_cancelled = result.to_i end |
#cancelled? ⇒ Boolean
69 70 71 |
# File 'lib/sidekiq/job/iterable.rb', line 69 def cancelled? @_cancelled end |
#cursor ⇒ Object
73 74 75 |
# File 'lib/sidekiq/job/iterable.rb', line 73 def cursor @_cursor.freeze end |
#each_iteration ⇒ void
This method returns an undefined value.
The action to be performed on each item from the enumerator.
132 133 134 |
# File 'lib/sidekiq/job/iterable.rb', line 132 def each_iteration(*) raise NotImplementedError, "#{self.class.name} must implement an '#each_iteration' method" end |
#initialize ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/sidekiq/job/iterable.rb', line 26 def initialize super @_executions = 0 @_cursor = nil @_start_time = nil @_runtime = 0 @_args = nil @_cancelled = nil @current_object = nil end |
#iteration_key ⇒ Object
136 137 138 |
# File 'lib/sidekiq/job/iterable.rb', line 136 def iteration_key "it-#{jid}" end |
#on_cancel ⇒ Object
A hook to override that will be called when the job is cancelled.
106 107 |
# File 'lib/sidekiq/job/iterable.rb', line 106 def on_cancel end |
#on_complete ⇒ Object
A hook to override that will be called when the job finished iterating.
111 112 |
# File 'lib/sidekiq/job/iterable.rb', line 111 def on_complete end |
#on_resume ⇒ Object
A hook to override that will be called when the job resumes iterating.
94 95 |
# File 'lib/sidekiq/job/iterable.rb', line 94 def on_resume end |
#on_start ⇒ Object
A hook to override that will be called when the job starts iterating.
It is called only once, for the first time.
81 82 |
# File 'lib/sidekiq/job/iterable.rb', line 81 def on_start end |
#on_stop ⇒ Object
A hook to override that will be called each time the job is interrupted.
This can be due to interruption or sidekiq stopping.
101 102 |
# File 'lib/sidekiq/job/iterable.rb', line 101 def on_stop end |
#perform(*args) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/sidekiq/job/iterable.rb', line 141 def perform(*args) @_args = args.dup.freeze fetch_previous_iteration_state @_executions += 1 @_start_time = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) enumerator = build_enumerator(*args, cursor: @_cursor) unless enumerator logger.info("'#build_enumerator' returned nil, skipping the job.") return end assert_enumerator!(enumerator) if @_executions == 1 on_start else on_resume end completed = catch(:abort) do iterate_with_enumerator(enumerator, args) end on_stop completed = handle_completed(completed) if completed on_complete cleanup else reenqueue_iteration_job end end |