Class: Sidekiq::Throttled::Strategy::Threshold

Inherits:
Object
  • Object
show all
Includes:
Base
Defined in:
lib/sidekiq/throttled/strategy/threshold.rb

Overview

TODO:

Use redis TIME command instead of sending current timestamp from sidekiq manager. See: redis.io/commands/time

Threshold throttling strategy

Instance Method Summary collapse

Methods included from Base

#limit

Constructor Details

#initialize(strategy_key, limit:, period:, key_suffix: nil) ⇒ Threshold

Returns a new instance of Threshold.

Parameters:

  • strategy_key (#to_s)
  • limit (#to_i, #call)

    Amount of allowed concurrent jobs per period running for given key.

  • :period (#to_f, #call)

    Period in seconds.

  • key_suffix (Proc) (defaults to: nil)

    Dynamic key suffix generator.



41
42
43
44
45
46
# File 'lib/sidekiq/throttled/strategy/threshold.rb', line 41

def initialize(strategy_key, limit:, period:, key_suffix: nil)
  @base_key   = "#{strategy_key}:threshold"
  @limit      = limit
  @period     = period
  @key_suffix = key_suffix
end

Instance Method Details

#count(*job_args) ⇒ Integer

Returns Current count of jobs.

Returns:

  • (Integer)

    Current count of jobs



92
93
94
# File 'lib/sidekiq/throttled/strategy/threshold.rb', line 92

def count(*job_args)
  Sidekiq.redis { |conn| conn.llen(key(job_args)) }.to_i
end

#dynamic?Boolean

Returns Whenever strategy has dynamic config.

Returns:

  • (Boolean)

    Whenever strategy has dynamic config



56
57
58
# File 'lib/sidekiq/throttled/strategy/threshold.rb', line 56

def dynamic?
  @key_suffix || @limit.respond_to?(:call) || @period.respond_to?(:call)
end

#period(job_args = nil) ⇒ Float

Returns Period in seconds.

Returns:

  • (Float)

    Period in seconds



49
50
51
52
53
# File 'lib/sidekiq/throttled/strategy/threshold.rb', line 49

def period(job_args = nil)
  return @period.to_f unless @period.respond_to? :call

  @period.call(*job_args).to_f
end

#reset!(*job_args) ⇒ void

This method returns an undefined value.

Resets count of jobs



98
99
100
# File 'lib/sidekiq/throttled/strategy/threshold.rb', line 98

def reset!(*job_args)
  Sidekiq.redis { |conn| conn.del(key(job_args)) }
end

#retry_in(*job_args) ⇒ Float

Returns How long, in seconds, before we’ll next be able to take on jobs.

Returns:

  • (Float)

    How long, in seconds, before we’ll next be able to take on jobs



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/sidekiq/throttled/strategy/threshold.rb', line 73

def retry_in(*job_args)
  job_limit = limit(job_args)
  return 0.0 if !job_limit || count(*job_args) < job_limit

  job_period = period(job_args)
  job_key = key(job_args)
  time_since_oldest = Time.now.to_f - Sidekiq.redis { |redis| redis.lindex(job_key, -1) }.to_f
  if time_since_oldest > job_period
    # The oldest job on our list is from more than the throttling period ago,
    # which means we have not hit the limit this period.
    0.0
  else
    # If we can only have X jobs every Y minutes, then wait until Y minutes have elapsed
    # since the oldest job on our list.
    job_period - time_since_oldest
  end
end

#throttled?(*job_args) ⇒ Boolean

Returns whenever job is throttled or not.

Returns:

  • (Boolean)

    whenever job is throttled or not



61
62
63
64
65
66
67
68
69
70
# File 'lib/sidekiq/throttled/strategy/threshold.rb', line 61

def throttled?(*job_args)
  job_limit = limit(job_args)
  return false unless job_limit
  return true if job_limit <= 0

  keys = [key(job_args)]
  argv = [job_limit, period(job_args), Time.now.to_f]

  Sidekiq.redis { |redis| 1 == SCRIPT.call(redis, keys: keys, argv: argv) }
end