Class: Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob
- Inherits:
-
Object
- Object
- Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob
- Includes:
- Utils::StrongMemoize
- Defined in:
- lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
Overview
This class defines an identifier of a job in a queue The identifier based on a job’s class and arguments.
As strategy decides when to keep track of the job in redis and when to remove it.
Storing the deduplication key in redis can be done by calling ‘check!` check returns the `jid` of the job if it was scheduled, or the `jid` of the duplicate job if it was already scheduled
When new jobs can be scheduled again, the strategy calls ‘#delete`.
Constant Summary collapse
- DEFAULT_DUPLICATE_KEY_TTL =
6.hours
- DEFAULT_STRATEGY =
:until_executing
- STRATEGY_NONE =
:none
- UPDATE_WAL_COOKIE_SCRIPT =
Generally speaking, updating a Redis key by deserializing and serializing it on the Redis server is bad for performance. However in the case of DuplicateJobs we know that key updates are rare, and the most common operations are setting, getting and deleting the key. The aim of this design is to make the common operations as fast as possible.
<<~LUA local cookie_msgpack = redis.call("get", KEYS[1]) if not cookie_msgpack then return end local cookie = cmsgpack.unpack(cookie_msgpack) for i = 1, #ARGV, 3 do local connection = ARGV[i] local current_offset = cookie.offsets[connection] local new_offset = tonumber(ARGV[i+1]) if not current_offset or (new_offset and current_offset < new_offset) then cookie.offsets[connection] = new_offset cookie.wal_locations[connection] = ARGV[i+2] end end redis.call("set", KEYS[1], cmsgpack.pack(cookie), "keepttl") LUA
- DEDUPLICATED_SCRIPT =
<<~LUA local cookie_msgpack = redis.call("get", KEYS[1]) if not cookie_msgpack then return end local cookie = cmsgpack.unpack(cookie_msgpack) cookie.deduplicated = "1" redis.call("set", KEYS[1], cmsgpack.pack(cookie), "keepttl") LUA
Instance Attribute Summary collapse
-
#existing_jid ⇒ Object
readonly
Returns the value of attribute existing_jid.
Instance Method Summary collapse
-
#check!(expiry = duplicate_key_ttl) ⇒ Object
This method will return the jid that was set in redis.
- #delete! ⇒ Object
- #duplicate? ⇒ Boolean
- #duplicate_key_ttl ⇒ Object
- #idempotent? ⇒ Boolean
-
#initialize(job, queue_name) ⇒ DuplicateJob
constructor
A new instance of DuplicateJob.
- #latest_wal_locations ⇒ Object
- #options ⇒ Object
-
#perform(&block) ⇒ Object
This will continue the server middleware chain if the job should be executed.
- #reschedule ⇒ Object
-
#schedule(&block) ⇒ Object
This will continue the middleware chain if the job should be scheduled It will return false if the job needs to be cancelled.
- #scheduled? ⇒ Boolean
- #scheduled_at ⇒ Object
- #set_deduplicated_flag!(expiry = duplicate_key_ttl) ⇒ Object
- #should_reschedule? ⇒ Boolean
- #update_latest_wal_location! ⇒ Object
Constructor Details
#initialize(job, queue_name) ⇒ DuplicateJob
Returns a new instance of DuplicateJob.
29 30 31 32 |
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 29 def initialize(job, queue_name) @job = job @queue_name = queue_name end |
Instance Attribute Details
#existing_jid ⇒ Object
Returns the value of attribute existing_jid.
27 28 29 |
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 27 def existing_jid @existing_jid end |
Instance Method Details
#check!(expiry = duplicate_key_ttl) ⇒ Object
This method will return the jid that was set in redis
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 48 def check!(expiry = duplicate_key_ttl) = { 'jid' => jid, 'offsets' => {}, 'wal_locations' => {}, 'existing_wal_locations' => job_wal_locations } # There are 3 possible scenarios. In order of decreasing likelyhood: # 1. SET NX succeeds. # 2. SET NX fails, GET succeeds. # 3. SET NX fails, the key expires and GET fails. In this case we must retry. = {} while .empty? set_succeeded = with_redis { |r| r.set(, .to_msgpack, nx: true, ex: expiry) } = set_succeeded ? : end job['idempotency_key'] = idempotency_key self.existing_wal_locations = ['existing_wal_locations'] self.existing_jid = ['jid'] end |
#delete! ⇒ Object
117 118 119 |
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 117 def delete! with_redis { |redis| redis.del() } end |
#duplicate? ⇒ Boolean
131 132 133 134 135 |
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 131 def duplicate? raise "Call `#check!` first to check for existing duplicates" unless existing_jid jid != existing_jid end |
#duplicate_key_ttl ⇒ Object
175 176 177 |
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 175 def duplicate_key_ttl [:ttl] || DEFAULT_DUPLICATE_KEY_TTL end |
#idempotent? ⇒ Boolean
168 169 170 171 172 173 |
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 168 def idempotent? return false unless worker_klass return false unless worker_klass.respond_to?(:idempotent?) worker_klass.idempotent? end |
#latest_wal_locations ⇒ Object
109 110 111 112 113 114 115 |
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 109 def latest_wal_locations return {} unless job_wal_locations.present? strong_memoize(:latest_wal_locations) do .fetch('wal_locations', {}) end end |
#options ⇒ Object
161 162 163 164 165 166 |
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 161 def return {} unless worker_klass return {} unless worker_klass.respond_to?(:get_deduplication_options) worker_klass. end |
#perform(&block) ⇒ Object
This will continue the server middleware chain if the job should be executed. It will return false if the job should not be executed.
43 44 45 |
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 43 def perform(&block) Strategies.for(strategy).new(self).perform(job, &block) end |
#reschedule ⇒ Object
121 122 123 124 125 |
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 121 def reschedule Gitlab::SidekiqLogging::DeduplicationLogger.instance.rescheduled_log(job) worker_klass.perform_async(*arguments) end |
#schedule(&block) ⇒ Object
This will continue the middleware chain if the job should be scheduled It will return false if the job needs to be cancelled
36 37 38 |
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 36 def schedule(&block) Strategies.for(strategy).new(self).schedule(job, &block) end |
#scheduled? ⇒ Boolean
127 128 129 |
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 127 def scheduled? scheduled_at.present? end |
#scheduled_at ⇒ Object
157 158 159 |
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 157 def scheduled_at job['at'] end |
#set_deduplicated_flag!(expiry = duplicate_key_ttl) ⇒ Object
137 138 139 140 141 |
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 137 def set_deduplicated_flag!(expiry = duplicate_key_ttl) return unless reschedulable? with_redis { |redis| redis.eval(DEDUPLICATED_SCRIPT, keys: []) } end |
#should_reschedule? ⇒ Boolean
153 154 155 |
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 153 def should_reschedule? reschedulable? && ['deduplicated'].present? end |
#update_latest_wal_location! ⇒ Object
72 73 74 75 76 77 78 79 80 81 |
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 72 def update_latest_wal_location! return unless job_wal_locations.present? argv = [] job_wal_locations.each do |connection_name, location| argv += [connection_name, pg_wal_lsn_diff(connection_name), location] end with_redis { |r| r.eval(UPDATE_WAL_COOKIE_SCRIPT, keys: [], argv: argv) } end |