Class: Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob

Inherits:
Object
  • Object
show all
Includes:
Utils::StrongMemoize
Defined in:
lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb

Overview

This class defines an identifier of a job in a queue The identifier based on a job’s class and arguments.

As strategy decides when to keep track of the job in redis and when to remove it.

Storing the deduplication key in redis can be done by calling ‘check!` check returns the `jid` of the job if it was scheduled, or the `jid` of the duplicate job if it was already scheduled

When new jobs can be scheduled again, the strategy calls ‘#delete`.

Constant Summary collapse

DEFAULT_DUPLICATE_KEY_TTL =
6.hours
DEFAULT_STRATEGY =
:until_executing
STRATEGY_NONE =
:none
<<~LUA
  local cookie_msgpack = redis.call("get", KEYS[1])
  if not cookie_msgpack then
    return
  end
  local cookie = cmsgpack.unpack(cookie_msgpack)

  for i = 1, #ARGV, 3 do
    local connection = ARGV[i]
    local current_offset = cookie.offsets[connection]
    local new_offset = tonumber(ARGV[i+1])
    if not current_offset or (new_offset and current_offset < new_offset) then
      cookie.offsets[connection] = new_offset
      cookie.wal_locations[connection] = ARGV[i+2]
    end
  end

  redis.call("set", KEYS[1], cmsgpack.pack(cookie), "keepttl")
LUA
DEDUPLICATED_SCRIPT =
<<~LUA
  local cookie_msgpack = redis.call("get", KEYS[1])
  if not cookie_msgpack then
    return
  end
  local cookie = cmsgpack.unpack(cookie_msgpack)
  cookie.deduplicated = "1"
  redis.call("set", KEYS[1], cmsgpack.pack(cookie), "keepttl")
LUA

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job, queue_name) ⇒ DuplicateJob

Returns a new instance of DuplicateJob.



29
30
31
32
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 29

def initialize(job, queue_name)
  @job = job
  @queue_name = queue_name
end

Instance Attribute Details

#existing_jidObject

Returns the value of attribute existing_jid.



27
28
29
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 27

def existing_jid
  @existing_jid
end

Instance Method Details

#check!(expiry = duplicate_key_ttl) ⇒ Object

This method will return the jid that was set in redis



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 48

def check!(expiry = duplicate_key_ttl)
  my_cookie = {
    'jid' => jid,
    'offsets' => {},
    'wal_locations' => {},
    'existing_wal_locations' => job_wal_locations
  }

  # There are 3 possible scenarios. In order of decreasing likelyhood:
  # 1. SET NX succeeds.
  # 2. SET NX fails, GET succeeds.
  # 3. SET NX fails, the key expires and GET fails. In this case we must retry.
  actual_cookie = {}
  while actual_cookie.empty?
    set_succeeded = with_redis { |r| r.set(cookie_key, my_cookie.to_msgpack, nx: true, ex: expiry) }
    actual_cookie = set_succeeded ? my_cookie : get_cookie
  end

  job['idempotency_key'] = idempotency_key

  self.existing_wal_locations = actual_cookie['existing_wal_locations']
  self.existing_jid = actual_cookie['jid']
end

#delete!Object



117
118
119
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 117

def delete!
  with_redis { |redis| redis.del(cookie_key) }
end

#duplicate?Boolean

Returns:

  • (Boolean)


131
132
133
134
135
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 131

def duplicate?
  raise "Call `#check!` first to check for existing duplicates" unless existing_jid

  jid != existing_jid
end

#duplicate_key_ttlObject



175
176
177
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 175

def duplicate_key_ttl
  options[:ttl] || DEFAULT_DUPLICATE_KEY_TTL
end

#idempotent?Boolean

Returns:

  • (Boolean)


168
169
170
171
172
173
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 168

def idempotent?
  return false unless worker_klass
  return false unless worker_klass.respond_to?(:idempotent?)

  worker_klass.idempotent?
end

#latest_wal_locationsObject



109
110
111
112
113
114
115
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 109

def latest_wal_locations
  return {} unless job_wal_locations.present?

  strong_memoize(:latest_wal_locations) do
    get_cookie.fetch('wal_locations', {})
  end
end

#optionsObject



161
162
163
164
165
166
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 161

def options
  return {} unless worker_klass
  return {} unless worker_klass.respond_to?(:get_deduplication_options)

  worker_klass.get_deduplication_options
end

#perform(&block) ⇒ Object

This will continue the server middleware chain if the job should be executed. It will return false if the job should not be executed.



43
44
45
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 43

def perform(&block)
  Strategies.for(strategy).new(self).perform(job, &block)
end

#rescheduleObject



121
122
123
124
125
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 121

def reschedule
  Gitlab::SidekiqLogging::DeduplicationLogger.instance.rescheduled_log(job)

  worker_klass.perform_async(*arguments)
end

#schedule(&block) ⇒ Object

This will continue the middleware chain if the job should be scheduled It will return false if the job needs to be cancelled



36
37
38
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 36

def schedule(&block)
  Strategies.for(strategy).new(self).schedule(job, &block)
end

#scheduled?Boolean

Returns:

  • (Boolean)


127
128
129
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 127

def scheduled?
  scheduled_at.present?
end

#scheduled_atObject



157
158
159
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 157

def scheduled_at
  job['at']
end

#set_deduplicated_flag!(expiry = duplicate_key_ttl) ⇒ Object



137
138
139
140
141
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 137

def set_deduplicated_flag!(expiry = duplicate_key_ttl)
  return unless reschedulable?

  with_redis { |redis| redis.eval(DEDUPLICATED_SCRIPT, keys: [cookie_key]) }
end

#should_reschedule?Boolean

Returns:

  • (Boolean)


153
154
155
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 153

def should_reschedule?
  reschedulable? && get_cookie['deduplicated'].present?
end

#update_latest_wal_location!Object



72
73
74
75
76
77
78
79
80
81
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 72

def update_latest_wal_location!
  return unless job_wal_locations.present?

  argv = []
  job_wal_locations.each do |connection_name, location|
    argv += [connection_name, pg_wal_lsn_diff(connection_name), location]
  end

  with_redis { |r| r.eval(UPDATE_WAL_COOKIE_SCRIPT, keys: [cookie_key], argv: argv) }
end