Class: RedisMigrationWorker

Inherits:
Object
  • Object
show all
Includes:
ApplicationWorker
Defined in:
app/workers/redis_migration_worker.rb

Constant Summary collapse

SCAN_START_STOP =
'0'

Constants included from ApplicationWorker

ApplicationWorker::LOGGING_EXTRA_KEY, ApplicationWorker::SAFE_PUSH_BULK_LIMIT

Constants included from Gitlab::Loggable

Gitlab::Loggable::ANONYMOUS

Constants included from WorkerAttributes

WorkerAttributes::DEFAULT_DATA_CONSISTENCY, WorkerAttributes::DEFAULT_DEFER_DELAY, WorkerAttributes::NAMESPACE_WEIGHTS, WorkerAttributes::VALID_DATA_CONSISTENCIES, WorkerAttributes::VALID_RESOURCE_BOUNDARIES, WorkerAttributes::VALID_URGENCIES

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Gitlab::Loggable

#build_structured_payload

Methods included from Gitlab::SidekiqVersioning::Worker

#job_version

Methods included from WorkerContext

#with_context

Class Method Details

.fetch_migrator!(job_class_name) ⇒ Object

Raises:

  • (NotImplementedError)


33
34
35
36
37
38
# File 'app/workers/redis_migration_worker.rb', line 33

def fetch_migrator!(job_class_name)
  job_class = "Gitlab::BackgroundMigration::Redis::#{job_class_name}".safe_constantize
  raise NotImplementedError, "#{job_class_name} does not exist" if job_class.nil?

  job_class.new
end

Instance Method Details

#perform(job_class_name, cursor, options = {}) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'app/workers/redis_migration_worker.rb', line 14

def perform(job_class_name, cursor, options = {})
  migrator = self.class.fetch_migrator!(job_class_name)

  scan_size = options[:scan_size] || 1000
  deadline = Time.now.utc + 3.minutes

  while Time.now.utc < deadline
    cursor, keys = migrator.redis.scan(cursor, match: migrator.scan_match_pattern, count: scan_size)

    migrator.perform(keys) if keys.any?

    sleep(0.01)
    break if cursor == SCAN_START_STOP
  end

  self.class.perform_async(job_class_name, cursor, options) unless cursor == SCAN_START_STOP
end