Class: Sidekiq::Throttled::Strategy::Concurrency

Inherits:
Object
  • Object
show all
Includes:
Base
Defined in:
lib/sidekiq/throttled/strategy/concurrency.rb

Overview

Concurrency throttling strategy

Instance Method Summary collapse

Methods included from Base

#limit

Constructor Details

#initialize(strategy_key, limit:, ttl: 900, key_suffix: nil) ⇒ Concurrency

Returns a new instance of Concurrency.

Parameters:

  • strategy_key (#to_s)
  • limit (#to_i, #call)

    Amount of allowed concurrent jobs per processors running for given key.

  • ttl (#to_i) (defaults to: 900)

    Concurrency lock TTL in seconds.

  • key_suffix (Proc) (defaults to: nil)

    Dynamic key suffix generator.



31
32
33
34
35
36
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 31

def initialize(strategy_key, limit:, ttl: 900, key_suffix: nil)
  @base_key   = "#{strategy_key}:concurrency.v2"
  @limit      = limit
  @ttl        = ttl.to_i
  @key_suffix = key_suffix
end

Instance Method Details

#count(*job_args) ⇒ Integer

Returns Current count of jobs.

Returns:

  • (Integer)

    Current count of jobs



66
67
68
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 66

def count(*job_args)
  Sidekiq.redis { |conn| conn.zcard(key(job_args)) }.to_i
end

#dynamic?Boolean

Returns Whenever strategy has dynamic config.

Returns:

  • (Boolean)

    Whenever strategy has dynamic config



39
40
41
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 39

def dynamic?
  @key_suffix || @limit.respond_to?(:call)
end

#finalize!(jid, *job_args) ⇒ void

This method returns an undefined value.

Remove jid from the pool of jobs in progress



78
79
80
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 78

def finalize!(jid, *job_args)
  Sidekiq.redis { |conn| conn.zrem(key(job_args), jid.to_s) }
end

#reset!(*job_args) ⇒ void

This method returns an undefined value.

Resets count of jobs



72
73
74
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 72

def reset!(*job_args)
  Sidekiq.redis { |conn| conn.del(key(job_args)) }
end

#retry_in(_jid, *job_args) ⇒ Float

Returns How long, in seconds, before we’ll next be able to take on jobs.

Returns:

  • (Float)

    How long, in seconds, before we’ll next be able to take on jobs



56
57
58
59
60
61
62
63
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 56

def retry_in(_jid, *job_args)
  job_limit = limit(job_args)
  return 0.0 if !job_limit || count(*job_args) < job_limit

  oldest_jid_with_score = Sidekiq.redis { |redis| redis.zrange(key(job_args), 0, 0, withscores: true) }.first
  expiry_time = oldest_jid_with_score.last.to_f
  expiry_time - Time.now.to_f
end

#throttled?(jid, *job_args) ⇒ Boolean

Returns whenever job is throttled or not.

Returns:

  • (Boolean)

    whenever job is throttled or not



44
45
46
47
48
49
50
51
52
53
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 44

def throttled?(jid, *job_args)
  job_limit = limit(job_args)
  return false unless job_limit
  return true if job_limit <= 0

  keys = [key(job_args)]
  argv = [jid.to_s, job_limit, @ttl, Time.now.to_f]

  Sidekiq.redis { |redis| 1 == SCRIPT.call(redis, keys: keys, argv: argv) }
end