Class: SidekiqUniqueJobs::Orphans::RubyReaper

Inherits:
Reaper
  • Object
show all
Defined in:
lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb

Overview

Note:

this is a much slower version of the lua script but does not crash redis

Class DeleteOrphans provides deletion of orphaned digests

Author:

Constant Summary

Constants inherited from Reaper

SidekiqUniqueJobs::Orphans::Reaper::REAPERS

Instance Attribute Summary collapse

Attributes inherited from Reaper

#conn

Instance Method Summary collapse

Methods inherited from Reaper

call, #config, #reaper, #reaper_count, #reaper_timeout

Methods included from JSON

dump_json, load_json

Methods included from Logging

included, #log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger, #logging_context, #with_configured_loggers_context, #with_logging_context

Methods included from Script::Caller

call_script, debug_lua, do_call, extract_args, max_history, now_f, redis_version

Methods included from Connection

included, #redis

Constructor Details

#initialize(conn) ⇒ RubyReaper

Initialize a new instance of DeleteOrphans

Parameters:

  • conn (Redis)

    a connection to redis



31
32
33
34
35
36
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 31

def initialize(conn)
  super(conn)
  @digests   = SidekiqUniqueJobs::Digests.new
  @scheduled = Redis::SortedSet.new(SCHEDULE)
  @retried   = Redis::SortedSet.new(RETRY)
end

Instance Attribute Details

#digestsObject (readonly)

Returns the value of attribute digests.



16
17
18
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 16

def digests
  @digests
end

#retriedObject (readonly)

Returns the value of attribute retried.



24
25
26
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 24

def retried
  @retried
end

#scheduledObject (readonly)

Returns the value of attribute scheduled.



20
21
22
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 20

def scheduled
  @scheduled
end

Instance Method Details

#active?(digest) ⇒ Boolean

rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity

Returns:

  • (Boolean)


120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 120

def active?(digest) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
  Sidekiq.redis do |conn|
    procs = conn.sscan_each("processes").to_a
    return false if procs.empty?

    procs.sort.each do |key|
      valid, workers = conn.pipelined do
        conn.exists(key)
        conn.hgetall("#{key}:workers")
      end

      next unless valid
      next unless workers.any?

      workers.each_pair do |_tid, job|
        item = load_json(job)

        return true if item.dig(PAYLOAD, LOCK_DIGEST) == digest
        return true if considered_active?(item[CREATED_AT])
      end
    end

    false
  end
end

#belongs_to_job?(digest) ⇒ true, false

Checks if the digest has a matching job.

1. It checks the scheduled set
2. It checks the retry set
3. It goes through all queues

Parameters:

  • digest (String)

    the digest to search for

Returns:

  • (true)

    when either of the checks return true

  • (false)

    when no job was found for this digest



75
76
77
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 75

def belongs_to_job?(digest)
  scheduled?(digest) || retried?(digest) || enqueued?(digest) || active?(digest)
end

#callInteger

Delete orphaned digests

Returns:

  • (Integer)

    the number of reaped locks



44
45
46
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 44

def call
  BatchDelete.call(orphans, conn)
end

#considered_active?(time_f) ⇒ Boolean

Returns:

  • (Boolean)


146
147
148
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 146

def considered_active?(time_f)
  (Time.now - reaper_timeout).to_f < time_f
end

#enqueued?(digest) ⇒ true

Checks if the digest exists in a Sidekiq::Queue

Parameters:

  • digest (String)

    the current digest

Returns:

  • (true)

    when digest exists in any queue



108
109
110
111
112
113
114
115
116
117
118
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 108

def enqueued?(digest)
  Sidekiq.redis do |conn|
    queues(conn) do |queue|
      entries(conn, queue) do |entry|
        return true if entry.include?(digest)
      end
    end

    false
  end
end

#entries(conn, queue, &block) ⇒ Object

rubocop:disable Metrics/MethodLength



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 163

def entries(conn, queue, &block) # rubocop:disable Metrics/MethodLength
  queue_key    = "queue:#{queue}"
  initial_size = conn.llen(queue_key)
  deleted_size = 0
  page         = 0
  page_size    = 50

  loop do
    range_start = page * page_size - deleted_size
    range_end   = range_start + page_size - 1
    entries     = conn.lrange(queue_key, range_start, range_end)
    page       += 1

    break if entries.empty?

    entries.each(&block)

    deleted_size = initial_size - conn.llen(queue_key)
  end
end

#in_sorted_set?(key, digest) ⇒ true, false

Checks a sorted set for the existance of this digest

Parameters:

  • key (String)

    the key for the sorted set

  • digest (String)

    the digest to scan for

Returns:

  • (true)

    when found

  • (false)

    when missing



194
195
196
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 194

def in_sorted_set?(key, digest)
  conn.zscan_each(key, match: "*#{digest}*", count: 1).to_a.any?
end

#orphansArray<String>

Find orphaned digests

Returns:

  • (Array<String>)

    an array of orphaned digests



54
55
56
57
58
59
60
61
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 54

def orphans
  conn.zrevrange(digests.key, 0, -1).each_with_object([]) do |digest, result|
    next if belongs_to_job?(digest)

    result << digest
    break if result.size >= reaper_count
  end
end

#queues(conn) { ... } ⇒ void

This method returns an undefined value.

Loops through all the redis queues and yields them one by one

Parameters:

  • conn (Redis)

    the connection to use for fetching queues

Yields:

  • queues one at a time



159
160
161
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 159

def queues(conn, &block)
  conn.sscan_each("queues", &block)
end

#retried?(digest) ⇒ true

Checks if the digest exists in the Sidekiq::RetrySet

Parameters:

  • digest (String)

    the current digest

Returns:

  • (true)

    when digest exists in retry set



97
98
99
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 97

def retried?(digest)
  in_sorted_set?(RETRY, digest)
end

#scheduled?(digest) ⇒ true

Checks if the digest exists in the Sidekiq::ScheduledSet

Parameters:

  • digest (String)

    the current digest

Returns:

  • (true)

    when digest exists in scheduled set



86
87
88
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 86

def scheduled?(digest)
  in_sorted_set?(SCHEDULE, digest)
end