Class: Gitlab::SidekiqMiddleware::PauseControl::PauseControlService
- Inherits:
-
Object
- Object
- Gitlab::SidekiqMiddleware::PauseControl::PauseControlService
- Defined in:
- lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb
Constant Summary collapse
- LIMIT =
Class for managing queues for paused workers When a worker is paused all jobs are saved in a separate sorted sets in redis
1000
- PROJECT_CONTEXT_KEY =
"#{Gitlab::ApplicationContext::LOG_KEY}.project".freeze
Class Method Summary collapse
- .add_to_waiting_queue!(worker_name, args, context) ⇒ Object
- .has_jobs_in_waiting_queue?(worker_name) ⇒ Boolean
- .queue_size(worker_name) ⇒ Object
- .resume_processing!(worker_name) ⇒ Object
Instance Method Summary collapse
- #add_to_waiting_queue!(args, context) ⇒ Object
- #has_jobs_in_waiting_queue? ⇒ Boolean
-
#initialize(worker_name) ⇒ PauseControlService
constructor
A new instance of PauseControlService.
- #queue_size ⇒ Object
- #resume_processing!(iterations: 1) ⇒ Object
Constructor Details
#initialize(worker_name) ⇒ PauseControlService
Returns a new instance of PauseControlService.
12 13 14 15 16 17 18 |
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 12 def initialize(worker_name) @worker_name = worker_name worker_name = @worker_name.underscore @redis_set_key = "sidekiq:pause_control:paused_jobs:zset:{#{worker_name}}" @redis_score_key = "sidekiq:pause_control:paused_jobs:score:{#{worker_name}}" end |
Class Method Details
.add_to_waiting_queue!(worker_name, args, context) ⇒ Object
21 22 23 |
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 21 def add_to_waiting_queue!(worker_name, args, context) new(worker_name).add_to_waiting_queue!(args, context) end |
.has_jobs_in_waiting_queue?(worker_name) ⇒ Boolean
25 26 27 |
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 25 def has_jobs_in_waiting_queue?(worker_name) new(worker_name).has_jobs_in_waiting_queue? end |
.queue_size(worker_name) ⇒ Object
33 34 35 |
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 33 def queue_size(worker_name) new(worker_name).queue_size end |
.resume_processing!(worker_name) ⇒ Object
29 30 31 |
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 29 def resume_processing!(worker_name) new(worker_name).resume_processing! end |
Instance Method Details
#add_to_waiting_queue!(args, context) ⇒ Object
38 39 40 41 42 |
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 38 def add_to_waiting_queue!(args, context) with_redis do |redis| redis.zadd(redis_set_key, generate_unique_score(redis), serialize(args, context)) end end |
#has_jobs_in_waiting_queue? ⇒ Boolean
48 49 50 |
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 48 def has_jobs_in_waiting_queue? with_redis { |redis| redis.exists?(redis_set_key) } # rubocop:disable CodeReuse/ActiveRecord end |
#queue_size ⇒ Object
44 45 46 |
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 44 def queue_size with_redis { |redis| redis.zcard(redis_set_key) } end |
#resume_processing!(iterations: 1) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb', line 52 def resume_processing!(iterations: 1) with_redis do |redis| iterations.times do jobs_with_scores = next_batch_from_waiting_queue(redis) break if jobs_with_scores.empty? parsed_jobs = jobs_with_scores.map { |j, _| deserialize(j) } parsed_jobs.each { |j| send_to_processing_queue(j) } remove_jobs_from_waiting_queue(redis, jobs_with_scores) end size = queue_size redis.del(redis_score_key, redis_set_key) if size == 0 size end end |