Class: Sidekiq::Throttled::Strategy::Threshold
- Inherits:
-
Object
- Object
- Sidekiq::Throttled::Strategy::Threshold
- Includes:
- Base
- Defined in:
- lib/sidekiq/throttled/strategy/threshold.rb
Overview
TODO:
Use redis TIME command instead of sending current timestamp from sidekiq manager. See: redis.io/commands/time
Threshold throttling strategy
Instance Method Summary collapse
-
#count(*job_args) ⇒ Integer
Current count of jobs.
-
#dynamic? ⇒ Boolean
Whenever strategy has dynamic config.
-
#initialize(strategy_key, limit:, period:, key_suffix: nil) ⇒ Threshold
constructor
A new instance of Threshold.
-
#period(job_args = nil) ⇒ Float
Period in seconds.
-
#reset!(*job_args) ⇒ void
Resets count of jobs.
-
#retry_in(*job_args) ⇒ Float
How long, in seconds, before we’ll next be able to take on jobs.
-
#throttled?(*job_args) ⇒ Boolean
Whenever job is throttled or not.
Methods included from Base
Constructor Details
#initialize(strategy_key, limit:, period:, key_suffix: nil) ⇒ Threshold
Returns a new instance of Threshold.
41 42 43 44 45 46 |
# File 'lib/sidekiq/throttled/strategy/threshold.rb', line 41 def initialize(strategy_key, limit:, period:, key_suffix: nil) @base_key = "#{strategy_key}:threshold" @limit = limit @period = period @key_suffix = key_suffix end |
Instance Method Details
#count(*job_args) ⇒ Integer
Returns Current count of jobs.
92 93 94 |
# File 'lib/sidekiq/throttled/strategy/threshold.rb', line 92 def count(*job_args) Sidekiq.redis { |conn| conn.llen(key(job_args)) }.to_i end |
#dynamic? ⇒ Boolean
Returns Whenever strategy has dynamic config.
56 57 58 |
# File 'lib/sidekiq/throttled/strategy/threshold.rb', line 56 def dynamic? @key_suffix || @limit.respond_to?(:call) || @period.respond_to?(:call) end |
#period(job_args = nil) ⇒ Float
Returns Period in seconds.
49 50 51 52 53 |
# File 'lib/sidekiq/throttled/strategy/threshold.rb', line 49 def period(job_args = nil) return @period.to_f unless @period.respond_to? :call @period.call(*job_args).to_f end |
#reset!(*job_args) ⇒ void
This method returns an undefined value.
Resets count of jobs
98 99 100 |
# File 'lib/sidekiq/throttled/strategy/threshold.rb', line 98 def reset!(*job_args) Sidekiq.redis { |conn| conn.del(key(job_args)) } end |
#retry_in(*job_args) ⇒ Float
Returns How long, in seconds, before we’ll next be able to take on jobs.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/sidekiq/throttled/strategy/threshold.rb', line 73 def retry_in(*job_args) job_limit = limit(job_args) return 0.0 if !job_limit || count(*job_args) < job_limit job_period = period(job_args) job_key = key(job_args) time_since_oldest = Time.now.to_f - Sidekiq.redis { |redis| redis.lindex(job_key, -1) }.to_f if time_since_oldest > job_period # The oldest job on our list is from more than the throttling period ago, # which means we have not hit the limit this period. 0.0 else # If we can only have X jobs every Y minutes, then wait until Y minutes have elapsed # since the oldest job on our list. job_period - time_since_oldest end end |
#throttled?(*job_args) ⇒ Boolean
Returns whenever job is throttled or not.
61 62 63 64 65 66 67 68 69 70 |
# File 'lib/sidekiq/throttled/strategy/threshold.rb', line 61 def throttled?(*job_args) job_limit = limit(job_args) return false unless job_limit return true if job_limit <= 0 keys = [key(job_args)] argv = [job_limit, period(job_args), Time.now.to_f] Sidekiq.redis { |redis| 1 == SCRIPT.call(redis, keys: keys, argv: argv) } end |