Class: Sidekiq::Throttler::RateLimit
- Inherits:
-
Object
- Object
- Sidekiq::Throttler::RateLimit
- Defined in:
- lib/sidekiq/throttler/rate_limit.rb
Overview
Handles the tracking of rate limits.
TODO: Consider reducing threshold
and period
to smooth out job
executions so that "24 jobs every 1 hour" becomes "1 job every 2 minutes
and 30 seconds"
Constant Summary collapse
- LOCK =
Mutex.new
Instance Attribute Summary collapse
-
#payload ⇒ Array
readonly
The message payload for the current job.
-
#queue ⇒ String
readonly
The queue to rate limit.
-
#worker ⇒ Sidekiq::Worker
readonly
The worker to rate limit.
Instance Method Summary collapse
-
#can_throttle? ⇒ true, false
Check if rate limiting options were correctly specified on the worker.
-
#count ⇒ Integer
Fetch the number of jobs executed.
-
#exceeded {|delay| ... } ⇒ Object
Set a callback to be executed when #execute is called and the rate limit has exceeded the threshold.
-
#exceeded? ⇒ true, false
Check if rate limit has exceeded the threshold.
-
#execute ⇒ Object
Executes a callback (#within_bounds, or #exceeded) depending on the state of the rate limit.
-
#executions ⇒ Object
Get the storage backend.
-
#increment ⇒ Integer
Increment the count of jobs executed.
-
#initialize(worker, payload, queue, options = {}) ⇒ RateLimit
constructor
A new instance of RateLimit.
-
#key ⇒ String
The key name used when storing counters for jobs.
-
#options ⇒ {String => Float, Integer}
Returns the rate limit options for the current running worker.
-
#period ⇒ Float
The number of seconds in the rate limit period.
-
#reset! ⇒ Object
Reset the tracking of job executions.
-
#threshold ⇒ Integer
The number of jobs that are allowed within the
period
. -
#within_bounds(&block) ⇒ Object
Set a callback to be executed when #execute is called and the rate limit has not exceeded the threshold.
-
#within_bounds? ⇒ true, false
Check if rate limit is within the threshold.
Constructor Details
#initialize(worker, payload, queue, options = {}) ⇒ RateLimit
Returns a new instance of RateLimit.
42 43 44 45 46 47 48 49 50 51 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 42 def initialize(worker, payload, queue, = {}) @worker = worker @payload = payload @queue = queue unless @storage_class = lookup_storage(.fetch(:storage, :memory)) raise ArgumentError, "Unrecognized storage backend: #{[:storage].inspect}" end end |
Instance Attribute Details
#payload ⇒ Array (readonly)
Returns The message payload for the current job.
23 24 25 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 23 def payload @payload end |
#queue ⇒ String (readonly)
Returns The queue to rate limit.
28 29 30 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 28 def queue @queue end |
#worker ⇒ Sidekiq::Worker (readonly)
Returns The worker to rate limit.
18 19 20 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 18 def worker @worker end |
Instance Method Details
#can_throttle? ⇒ true, false
Check if rate limiting options were correctly specified on the worker.
108 109 110 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 108 def can_throttle? [threshold, period].select(&:zero?).empty? end |
#count ⇒ Integer
Fetch the number of jobs executed.
58 59 60 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 58 def count self.class.count(self) end |
#exceeded {|delay| ... } ⇒ Object
Set a callback to be executed when #execute is called and the rate limit has exceeded the threshold.
141 142 143 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 141 def exceeded(&block) @exceeded = block end |
#exceeded? ⇒ true, false
Check if rate limit has exceeded the threshold.
116 117 118 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 116 def exceeded? count >= threshold end |
#execute ⇒ Object
Executes a callback (#within_bounds, or #exceeded) depending on the state of the rate limit.
148 149 150 151 152 153 154 155 156 157 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 148 def execute return @within_bounds.call unless can_throttle? if exceeded? @exceeded.call(period) else increment @within_bounds.call end end |
#executions ⇒ Object
Get the storage backend.
167 168 169 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 167 def executions @storage_class.instance end |
#increment ⇒ Integer
Increment the count of jobs executed.
67 68 69 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 67 def increment self.class.increment(self) end |
#key ⇒ String
Returns The key name used when storing counters for jobs.
96 97 98 99 100 101 102 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 96 def key @key ||= if ['key'] ['key'].respond_to?(:call) ? ['key'].call(*payload) : ['key'] else "#{@worker.class.to_s.underscore.gsub('/', ':')}:#{@queue}" end end |
#options ⇒ {String => Float, Integer}
Returns the rate limit options for the current running worker.
75 76 77 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 75 def @options ||= (worker.class.['throttle'] || {}).stringify_keys end |
#period ⇒ Float
Returns The number of seconds in the rate limit period.
89 90 91 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 89 def period @period ||= (['period'].respond_to?(:call) ? ['period'].call(*payload) : ['period']).to_f end |
#reset! ⇒ Object
Reset the tracking of job executions.
161 162 163 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 161 def reset! executions.reset end |
#threshold ⇒ Integer
Returns The number of jobs that are allowed within the period
.
82 83 84 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 82 def threshold @threshold ||= (['threshold'].respond_to?(:call) ? ['threshold'].call(*payload) : ['threshold']).to_i end |
#within_bounds(&block) ⇒ Object
Set a callback to be executed when #execute is called and the rate limit has not exceeded the threshold.
131 132 133 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 131 def within_bounds(&block) @within_bounds = block end |
#within_bounds? ⇒ true, false
Check if rate limit is within the threshold.
124 125 126 |
# File 'lib/sidekiq/throttler/rate_limit.rb', line 124 def within_bounds? !exceeded? end |