Class: Chewy::Strategy::DelayedSidekiq::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/chewy/strategy/delayed_sidekiq/scheduler.rb

Constant Summary collapse

DEFAULT_TTL =

in seconds

60 * 60 * 24
DEFAULT_LATENCY =
10
DEFAULT_MARGIN =
2
DEFAULT_QUEUE =
'chewy'
KEY_PREFIX =
'chewy:delayed_sidekiq'
FALLBACK_FIELDS =
'all'
FIELDS_IDS_SEPARATOR =
';'
IDS_SEPARATOR =
','

Instance Method Summary collapse

Constructor Details

#initialize(type, ids, options = {}) ⇒ Scheduler

Returns a new instance of Scheduler.



56
57
58
59
60
# File 'lib/chewy/strategy/delayed_sidekiq/scheduler.rb', line 56

def initialize(type, ids, options = {})
  @type = type
  @ids = ids
  @options = options
end

Instance Method Details

#postponeObject

the diagram:

inputs: latency == 2 reindex_time = Time.current

Parallel OR Sequential triggers of reindex: | What is going on in reindex store (Redis):


                                                  |

process 1 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1] Schedule.new(CitiesIndex, [1]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] | & schedule a DelayedSidekiq::Worker at 1679347869 (at + 3) | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347866 score and reindex all ids with zpoped keys | chewy:delayed_sidekiq:CitiesIndex:1679347866 | | process 2 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2] Schedule.new(CitiesIndex, [2]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] | & do not schedule a new worker | | process 1 (reindex_time + (latency - 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3] Schedule.new(CitiesIndex, [3]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] | & do not schedule a new worker | | process 2 (reindex_time + (latency + 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3] Schedule.new(CitiesIndex, [4]).postpone | chewy:delayed_sidekiq:CitiesIndex:1679347868 = [4] | chewy:delayed_sidekiq:timechunks = [ | { score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"} | { score: 1679347868, "chewy:delayed_sidekiq:CitiesIndex:1679347868"} | ] | & schedule a DelayedSidekiq::Worker at 1679347871 (at + 3) | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347868 score and reindex all ids with zpoped keys | chewy:delayed_sidekiq:CitiesIndex:1679347866 (in case of failed previous reindex), | chewy:delayed_sidekiq:CitiesIndex:1679347868



98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/chewy/strategy/delayed_sidekiq/scheduler.rb', line 98

def postpone
  ::Sidekiq.redis do |redis|
    # do the redis stuff in a single command to avoid concurrency issues
    if redis.eval(LUA_SCRIPT, keys: [timechunk_key, timechunks_key], argv: [serialize_data, at, ttl])
      ::Sidekiq::Client.push(
        'queue' => sidekiq_queue,
        'at' => at + margin,
        'class' => Chewy::Strategy::DelayedSidekiq::Worker,
        'args' => [type_name, at]
      )
    end
  end
end