Class: Gitlab::SidekiqMiddleware::PauseControl::PauseControlService

Inherits:
Object
  • Object
show all
Defined in:
lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb

Constant Summary collapse

LIMIT =

Class for managing queues for paused workers When a worker is paused all jobs are saved in a separate sorted sets in redis

1000
PROJECT_CONTEXT_KEY =
"#{Gitlab::ApplicationContext::LOG_KEY}.project".freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker_name) ⇒ PauseControlService

Returns a new instance of PauseControlService.



12
13
14
15
16
17
18
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 12

def initialize(worker_name)
  @worker_name = worker_name

  worker_name = @worker_name.underscore
  @redis_set_key = "sidekiq:pause_control:paused_jobs:zset:{#{worker_name}}"
  @redis_score_key = "sidekiq:pause_control:paused_jobs:score:{#{worker_name}}"
end

Class Method Details

.add_to_waiting_queue!(worker_name, args, context) ⇒ Object



21
22
23
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 21

def add_to_waiting_queue!(worker_name, args, context)
  new(worker_name).add_to_waiting_queue!(args, context)
end

.has_jobs_in_waiting_queue?(worker_name) ⇒ Boolean

Returns:

  • (Boolean)


25
26
27
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 25

def has_jobs_in_waiting_queue?(worker_name)
  new(worker_name).has_jobs_in_waiting_queue?
end

.queue_size(worker_name) ⇒ Object



33
34
35
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 33

def queue_size(worker_name)
  new(worker_name).queue_size
end

.resume_processing!(worker_name) ⇒ Object



29
30
31
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 29

def resume_processing!(worker_name)
  new(worker_name).resume_processing!
end

Instance Method Details

#add_to_waiting_queue!(args, context) ⇒ Object



38
39
40
41
42
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 38

def add_to_waiting_queue!(args, context)
  with_redis do |redis|
    redis.zadd(redis_set_key, generate_unique_score(redis), serialize(args, context))
  end
end

#has_jobs_in_waiting_queue?Boolean

Returns:

  • (Boolean)


48
49
50
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 48

def has_jobs_in_waiting_queue?
  with_redis { |redis| redis.exists?(redis_set_key) } # rubocop:disable CodeReuse/ActiveRecord
end

#queue_sizeObject



44
45
46
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 44

def queue_size
  with_redis { |redis| redis.zcard(redis_set_key) }
end

#resume_processing!(iterations: 1) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 52

def resume_processing!(iterations: 1)
  with_redis do |redis|
    iterations.times do
      jobs_with_scores = next_batch_from_waiting_queue(redis)
      break if jobs_with_scores.empty?

      parsed_jobs = jobs_with_scores.map { |j, _| deserialize(j) }

      parsed_jobs.each { |j| send_to_processing_queue(j) }

      remove_jobs_from_waiting_queue(redis, jobs_with_scores)
    end

    size = queue_size
    redis.del(redis_score_key, redis_set_key) if size == 0

    size
  end
end