Class: Chewy::Strategy::DelayedSidekiq::Scheduler
- Inherits:
-
Object
- Object
- Chewy::Strategy::DelayedSidekiq::Scheduler
- Defined in:
- lib/chewy/strategy/delayed_sidekiq/scheduler.rb
Constant Summary collapse
- DEFAULT_TTL =
in seconds
60 * 60 * 24
- DEFAULT_LATENCY =
10
- DEFAULT_MARGIN =
2
- DEFAULT_QUEUE =
'chewy'
- KEY_PREFIX =
'chewy:delayed_sidekiq'
- FALLBACK_FIELDS =
'all'
- FIELDS_IDS_SEPARATOR =
';'
- IDS_SEPARATOR =
','
Instance Method Summary collapse
-
#initialize(type, ids, options = {}) ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#postpone ⇒ Object
the diagram:.
Constructor Details
#initialize(type, ids, options = {}) ⇒ Scheduler
Returns a new instance of Scheduler.
56 57 58 59 60 |
# File 'lib/chewy/strategy/delayed_sidekiq/scheduler.rb', line 56 def initialize(type, ids, = {}) @type = type @ids = ids @options = end |
Instance Method Details
#postpone ⇒ Object
the diagram:
inputs: latency == 2 reindex_time = Time.current
Parallel OR Sequential triggers of reindex: | What is going on in reindex store (Redis):
|
process 1 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1] Schedule.new(CitiesIndex, [1]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] | & schedule a DelayedSidekiq::Worker at 1679347869 (at + 3) | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347866 score and reindex all ids with zpoped keys | chewy:delayed_sidekiq:CitiesIndex:1679347866 | | process 2 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2] Schedule.new(CitiesIndex, [2]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] | & do not schedule a new worker | | process 1 (reindex_time + (latency - 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3] Schedule.new(CitiesIndex, [3]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] | & do not schedule a new worker | | process 2 (reindex_time + (latency + 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3] Schedule.new(CitiesIndex, [4]).postpone | chewy:delayed_sidekiq:CitiesIndex:1679347868 = [4] | chewy:delayed_sidekiq:timechunks = [ | { score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"} | { score: 1679347868, "chewy:delayed_sidekiq:CitiesIndex:1679347868"} | ] | & schedule a DelayedSidekiq::Worker at 1679347871 (at + 3) | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347868 score and reindex all ids with zpoped keys | chewy:delayed_sidekiq:CitiesIndex:1679347866 (in case of failed previous reindex), | chewy:delayed_sidekiq:CitiesIndex:1679347868
98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/chewy/strategy/delayed_sidekiq/scheduler.rb', line 98 def postpone ::Sidekiq.redis do |redis| # do the redis stuff in a single command to avoid concurrency issues if redis.eval(LUA_SCRIPT, keys: [timechunk_key, timechunks_key], argv: [serialize_data, at, ttl]) ::Sidekiq::Client.push( 'queue' => sidekiq_queue, 'at' => at + margin, 'class' => Chewy::Strategy::DelayedSidekiq::Worker, 'args' => [type_name, at] ) end end end |