Class: SidekiqUniqueJobs::Orphans::RubyReaper
- Defined in:
- lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb
Overview
this is a much slower version of the lua script but does not crash redis
Class DeleteOrphans provides deletion of orphaned digests
Constant Summary
Constants inherited from Reaper
SidekiqUniqueJobs::Orphans::Reaper::REAPERS
Instance Attribute Summary collapse
-
#digests ⇒ Object
readonly
Returns the value of attribute digests.
-
#retried ⇒ Object
readonly
Returns the value of attribute retried.
-
#scheduled ⇒ Object
readonly
Returns the value of attribute scheduled.
Attributes inherited from Reaper
Instance Method Summary collapse
-
#active?(digest) ⇒ Boolean
rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity.
-
#belongs_to_job?(digest) ⇒ true, false
Checks if the digest has a matching job.
-
#call ⇒ Integer
Delete orphaned digests.
- #considered_active?(time_f) ⇒ Boolean
-
#enqueued?(digest) ⇒ true
Checks if the digest exists in a Sidekiq::Queue.
-
#entries(conn, queue, &block) ⇒ Object
rubocop:disable Metrics/MethodLength.
-
#in_sorted_set?(key, digest) ⇒ true, false
Checks a sorted set for the existance of this digest.
-
#initialize(conn) ⇒ RubyReaper
constructor
Initialize a new instance of DeleteOrphans.
-
#orphans ⇒ Array<String>
Find orphaned digests.
-
#queues(conn) { ... } ⇒ void
Loops through all the redis queues and yields them one by one.
-
#retried?(digest) ⇒ true
Checks if the digest exists in the Sidekiq::RetrySet.
-
#scheduled?(digest) ⇒ true
Checks if the digest exists in the Sidekiq::ScheduledSet.
Methods inherited from Reaper
call, #config, #reaper, #reaper_count, #reaper_timeout
Methods included from JSON
Methods included from Logging
included, #log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger, #logging_context, #with_configured_loggers_context, #with_logging_context
Methods included from Script::Caller
call_script, debug_lua, do_call, extract_args, max_history, now_f, redis_version
Methods included from Connection
Constructor Details
#initialize(conn) ⇒ RubyReaper
Initialize a new instance of DeleteOrphans
31 32 33 34 35 36 |
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 31 def initialize(conn) super(conn) @digests = SidekiqUniqueJobs::Digests.new @scheduled = Redis::SortedSet.new(SCHEDULE) @retried = Redis::SortedSet.new(RETRY) end |
Instance Attribute Details
#digests ⇒ Object (readonly)
Returns the value of attribute digests.
16 17 18 |
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 16 def digests @digests end |
#retried ⇒ Object (readonly)
Returns the value of attribute retried.
24 25 26 |
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 24 def retried @retried end |
#scheduled ⇒ Object (readonly)
Returns the value of attribute scheduled.
20 21 22 |
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 20 def scheduled @scheduled end |
Instance Method Details
#active?(digest) ⇒ Boolean
rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 120 def active?(digest) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity Sidekiq.redis do |conn| procs = conn.sscan_each("processes").to_a return false if procs.empty? procs.sort.each do |key| valid, workers = conn.pipelined do conn.exists(key) conn.hgetall("#{key}:workers") end next unless valid next unless workers.any? workers.each_pair do |_tid, job| item = load_json(job) return true if item.dig(PAYLOAD, LOCK_DIGEST) == digest return true if considered_active?(item[CREATED_AT]) end end false end end |
#belongs_to_job?(digest) ⇒ true, false
Checks if the digest has a matching job.
1. It checks the scheduled set
2. It checks the retry set
3. It goes through all queues
75 76 77 |
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 75 def belongs_to_job?(digest) scheduled?(digest) || retried?(digest) || enqueued?(digest) || active?(digest) end |
#call ⇒ Integer
Delete orphaned digests
44 45 46 |
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 44 def call BatchDelete.call(orphans, conn) end |
#considered_active?(time_f) ⇒ Boolean
146 147 148 |
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 146 def considered_active?(time_f) (Time.now - reaper_timeout).to_f < time_f end |
#enqueued?(digest) ⇒ true
Checks if the digest exists in a Sidekiq::Queue
108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 108 def enqueued?(digest) Sidekiq.redis do |conn| queues(conn) do |queue| entries(conn, queue) do |entry| return true if entry.include?(digest) end end false end end |
#entries(conn, queue, &block) ⇒ Object
rubocop:disable Metrics/MethodLength
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 163 def entries(conn, queue, &block) # rubocop:disable Metrics/MethodLength queue_key = "queue:#{queue}" initial_size = conn.llen(queue_key) deleted_size = 0 page = 0 page_size = 50 loop do range_start = page * page_size - deleted_size range_end = range_start + page_size - 1 entries = conn.lrange(queue_key, range_start, range_end) page += 1 break if entries.empty? entries.each(&block) deleted_size = initial_size - conn.llen(queue_key) end end |
#in_sorted_set?(key, digest) ⇒ true, false
Checks a sorted set for the existance of this digest
194 195 196 |
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 194 def in_sorted_set?(key, digest) conn.zscan_each(key, match: "*#{digest}*", count: 1).to_a.any? end |
#orphans ⇒ Array<String>
Find orphaned digests
54 55 56 57 58 59 60 61 |
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 54 def orphans conn.zrevrange(digests.key, 0, -1).each_with_object([]) do |digest, result| next if belongs_to_job?(digest) result << digest break if result.size >= reaper_count end end |
#queues(conn) { ... } ⇒ void
This method returns an undefined value.
Loops through all the redis queues and yields them one by one
159 160 161 |
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 159 def queues(conn, &block) conn.sscan_each("queues", &block) end |
#retried?(digest) ⇒ true
Checks if the digest exists in the Sidekiq::RetrySet
97 98 99 |
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 97 def retried?(digest) in_sorted_set?(RETRY, digest) end |
#scheduled?(digest) ⇒ true
Checks if the digest exists in the Sidekiq::ScheduledSet
86 87 88 |
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 86 def scheduled?(digest) in_sorted_set?(SCHEDULE, digest) end |