Class: Chewy::Strategy::DelayedSidekiq::Worker
- Inherits:
-
Object
- Object
- Chewy::Strategy::DelayedSidekiq::Worker
- Includes:
- Sidekiq::Worker
- Defined in:
- lib/chewy/strategy/delayed_sidekiq/worker.rb
Constant Summary collapse
- LUA_SCRIPT =
<<~LUA local type = ARGV[1] local score = tonumber(ARGV[2]) local prefix = ARGV[3] local timechunks_key = prefix .. ":" .. type .. ":timechunks" -- Get timechunk_keys with scores less than or equal to the specified score local timechunk_keys = redis.call('zrangebyscore', timechunks_key, '-inf', score) -- Get all members from the sets associated with the timechunk_keys local members = {} for _, timechunk_key in ipairs(timechunk_keys) do local set_members = redis.call('smembers', timechunk_key) for _, member in ipairs(set_members) do table.insert(members, member) end end -- Remove timechunk_keys and their associated sets for _, timechunk_key in ipairs(timechunk_keys) do redis.call('del', timechunk_key) end -- Remove timechunks with scores less than or equal to the specified score redis.call('zremrangebyscore', timechunks_key, '-inf', score) return members LUA
Instance Method Summary collapse
Instance Method Details
#perform(type, score, options = {}) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/chewy/strategy/delayed_sidekiq/worker.rb', line 38 def perform(type, score, = {}) [:refresh] = !Chewy.disable_refresh_async if Chewy.disable_refresh_async ::Sidekiq.redis do |redis| members = redis.eval(LUA_SCRIPT, keys: [], argv: [type, score, Scheduler::KEY_PREFIX]) # extract ids and fields & do the reset of records ids, fields = extract_ids_and_fields(members) [:update_fields] = fields if fields index = type.constantize index.strategy_config.delayed_sidekiq.reindex_wrapper.call do .any? ? index.import!(ids, **) : index.import!(ids) end end end |