Class: SidekiqUniqueJobs::Digests

Inherits:
Redis::SortedSet show all
Defined in:
lib/sidekiq_unique_jobs/digests.rb

Overview

Class Changelogs provides access to the changelog entries

Author:

Direct Known Subclasses

ExpiringDigests

Constant Summary collapse

DEFAULT_COUNT =

Returns the number of matches to return by default.

Returns:

  • (Integer)

    the number of matches to return by default

1_000
SCAN_PATTERN =

Returns the default pattern to use for matching.

Returns:

  • (String)

    the default pattern to use for matching

"*"
EMPTY_KEYS_SEGMENT =

Returns The empty runtime or queuetime keys.

Returns:

  • (Array(String, String, String, String))

    The empty runtime or queuetime keys.

["", "", "", ""].freeze

Instance Attribute Summary

Attributes inherited from Redis::Entity

#key

Instance Method Summary collapse

Methods inherited from Redis::SortedSet

#byscore, #clear, #count, #rank, #score

Methods inherited from Redis::Entity

#count, #exist?, #expires?, #pttl, #ttl

Methods included from Timing

clock_stamp, now_f, time_source, timed

Methods included from JSON

dump_json, load_json, safe_load_json

Methods included from Script::Caller

call_script, debug_lua, do_call, extract_args, max_history, normalize_argv, now_f, redis_version

Methods included from Logging

#build_message, included, #log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger, #logging_context, #with_configured_loggers_context, #with_logging_context

Constructor Details

#initialize(digests_key = DIGESTS) ⇒ Digests

Returns a new instance of Digests.



20
21
22
# File 'lib/sidekiq_unique_jobs/digests.rb', line 20

def initialize(digests_key = DIGESTS)
  super(digests_key)
end

Instance Method Details

#add(digest) ⇒ Object

Adds a digest

Parameters:

  • digest (String)

    the digest to add



29
30
31
# File 'lib/sidekiq_unique_jobs/digests.rb', line 29

def add(digest)
  redis { |conn| conn.zadd(key, now_f, digest) }
end

#delete_by_digest(digest, queuetime: true, runtime: true) ⇒ Object

Delete unique digests by digest

Also deletes the :AVAILABLE, :EXPIRED etc keys

Parameters:

  • digest (String)

    a unique digest to delete

  • queuetime (Boolean) (defaults to: true)

    Whether to delete queue locks.

  • runtime (Boolean) (defaults to: true)

    Whether to delete run locks.



56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/sidekiq_unique_jobs/digests.rb', line 56

def delete_by_digest(digest, queuetime: true, runtime: true)
  result, elapsed = timed do
    call_script(
      :delete_by_digest,
      queuetime_keys(queuetime ? digest : nil) + runtime_keys(runtime ? digest : nil) + [key],
    )
  end

  log_info("#{__method__}(#{digest}) completed in #{elapsed}ms")

  result
end

#delete_by_pattern(pattern, count: DEFAULT_COUNT) ⇒ Hash<String,Float>

Deletes unique digests by pattern

Parameters:

  • pattern (String)

    a key pattern to match with

  • count (Integer) (defaults to: DEFAULT_COUNT)

    the maximum number

Returns:

  • (Hash<String,Float>)

    Hash mapping of digest matching the given pattern and score



39
40
41
42
43
44
45
46
47
48
# File 'lib/sidekiq_unique_jobs/digests.rb', line 39

def delete_by_pattern(pattern, count: DEFAULT_COUNT)
  result, elapsed = timed do
    digests = entries(pattern: pattern, count: count).keys
    redis { |conn| BatchDelete.call(digests, conn) }
  end

  log_info("#{__method__}(#{pattern}, count: #{count}) completed in #{elapsed}ms")

  result
end

#entries(pattern: SCAN_PATTERN, count: DEFAULT_COUNT) ⇒ Array<String>

The entries in this sorted set

Parameters:

  • pattern (String) (defaults to: SCAN_PATTERN)

    SCAN_PATTERN the match pattern to search for

  • count (Integer) (defaults to: DEFAULT_COUNT)

    DEFAULT_COUNT the number of entries to return

Returns:

  • (Array<String>)

    an array of digests matching the given pattern



77
78
79
# File 'lib/sidekiq_unique_jobs/digests.rb', line 77

def entries(pattern: SCAN_PATTERN, count: DEFAULT_COUNT)
  redis { |conn| conn.zscan(key, match: pattern, count: count).to_a }.to_h
end

#page(cursor: 0, pattern: SCAN_PATTERN, page_size: 100) ⇒ Array<Integer, Integer, Array<Lock>>

Returns a paginated

Parameters:

  • cursor (Integer) (defaults to: 0)

    the cursor for this iteration

  • pattern (String) (defaults to: SCAN_PATTERN)

    SCAN_PATTERN the match pattern to search for

  • page_size (Integer) (defaults to: 100)

    100 the size per page

Returns:

  • (Array<Integer, Integer, Array<Lock>>)

    total_size, next_cursor, locks



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/sidekiq_unique_jobs/digests.rb', line 90

def page(cursor: 0, pattern: SCAN_PATTERN, page_size: 100)
  redis do |conn|
    total_size, digests = conn.multi do |pipeline|
      pipeline.zcard(key)
      pipeline.zscan(key, cursor, match: pattern, count: page_size)
    end

    # NOTE: When debugging, check the last item in the returned array.
    [
      total_size.to_i,
      digests[0].to_i, # next_cursor
      digests[1].each_slice(2).map { |digest, score| Lock.new(digest, time: score) }, # entries
    ]
  end
end