Class: Gitlab::JobWaiter
- Inherits:
-
Object
- Object
- Gitlab::JobWaiter
- Defined in:
- lib/gitlab/job_waiter.rb
Overview
JobWaiter can be used to wait for a number of Sidekiq jobs to complete.
Its use requires the cooperation of the sidekiq jobs themselves. Set up the waiter, then start the jobs, passing them its `key`. Their `perform` methods should look like:
def perform(args, notify_key)
# do work
ensure
::Gitlab::JobWaiter.notify(notify_key, jid)
end
The JobWaiter blocks popping items from a Redis array. All the sidekiq jobs push to that array when done. Once the waiter has popped `count` items, it knows all the jobs are done.
Constant Summary collapse
- KEY_PREFIX =
"gitlab:job_waiter"
- STARTED_METRIC =
:gitlab_job_waiter_started_total
- TIMEOUTS_METRIC =
:gitlab_job_waiter_timeouts_total
Instance Attribute Summary collapse
-
#finished ⇒ Object
readonly
Returns the value of attribute finished.
-
#jobs_remaining ⇒ Object
Returns the value of attribute jobs_remaining.
-
#key ⇒ Object
readonly
Returns the value of attribute key.
-
#worker_label ⇒ Object
readonly
Returns the value of attribute worker_label.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(jobs_remaining = 0, key = "#{KEY_PREFIX}:#{SecureRandom.uuid}", worker_label: nil) ⇒ JobWaiter
constructor
jobs_remaining - the number of jobs left to wait for key - The key of this waiter.
-
#wait(timeout = 10) ⇒ Object
Waits for all the jobs to be completed.
Constructor Details
#initialize(jobs_remaining = 0, key = "#{KEY_PREFIX}:#{SecureRandom.uuid}", worker_label: nil) ⇒ JobWaiter
jobs_remaining - the number of jobs left to wait for key - The key of this waiter.
38 39 40 41 42 43 |
# File 'lib/gitlab/job_waiter.rb', line 38 def initialize(jobs_remaining = 0, key = "#{KEY_PREFIX}:#{SecureRandom.uuid}", worker_label: nil) @key = key @jobs_remaining = jobs_remaining @finished = [] @worker_label = worker_label end |
Instance Attribute Details
#finished ⇒ Object (readonly)
Returns the value of attribute finished
33 34 35 |
# File 'lib/gitlab/job_waiter.rb', line 33 def finished @finished end |
#jobs_remaining ⇒ Object
Returns the value of attribute jobs_remaining
34 35 36 |
# File 'lib/gitlab/job_waiter.rb', line 34 def jobs_remaining @jobs_remaining end |
#key ⇒ Object (readonly)
Returns the value of attribute key
33 34 35 |
# File 'lib/gitlab/job_waiter.rb', line 33 def key @key end |
#worker_label ⇒ Object (readonly)
Returns the value of attribute worker_label
33 34 35 |
# File 'lib/gitlab/job_waiter.rb', line 33 def worker_label @worker_label end |
Class Method Details
.key?(key) ⇒ Boolean
29 30 31 |
# File 'lib/gitlab/job_waiter.rb', line 29 def self.key?(key) key.is_a?(String) && key =~ /\A#{KEY_PREFIX}:\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/ end |
.notify(key, jid) ⇒ Object
25 26 27 |
# File 'lib/gitlab/job_waiter.rb', line 25 def self.notify(key, jid) Gitlab::Redis::SharedState.with { |redis| redis.lpush(key, jid) } end |
Instance Method Details
#wait(timeout = 10) ⇒ Object
Waits for all the jobs to be completed.
timeout - The maximum amount of seconds to block the caller for. This
ensures we don't indefinitely block a caller in case a job takes
long to process, or is never processed.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/gitlab/job_waiter.rb', line 50 def wait(timeout = 10) deadline = Time.now.utc + timeout increment_counter(STARTED_METRIC) Gitlab::Redis::SharedState.with do |redis| # Fallback key expiry: allow a long grace period to reduce the chance of # a job pushing to an expired key and recreating it redis.expire(key, [timeout * 2, 10.minutes.to_i].max) while jobs_remaining > 0 # Redis will not take fractional seconds. Prefer waiting too long over # not waiting long enough seconds_left = (deadline - Time.now.utc).ceil # Redis interprets 0 as "wait forever", so skip the final `blpop` call break if seconds_left <= 0 list, jid = redis.blpop(key, timeout: seconds_left) # timed out unless list && jid increment_counter(TIMEOUTS_METRIC) break end @finished << jid @jobs_remaining -= 1 end # All jobs have finished, so expire the key immediately redis.expire(key, 0) if jobs_remaining == 0 end finished end |