Class: Sidekiq::Throttled::Strategy::Concurrency

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/throttled/strategy/concurrency.rb

Overview

Concurrency throttling strategy

Instance Method Summary collapse

Constructor Details

#initialize(strategy_key, opts) ⇒ Concurrency

Returns a new instance of Concurrency.

Parameters:

  • strategy_key (#to_s)
  • opts (Hash)

Options Hash (opts):

  • :limit (#to_i)

    Amount of allowed concurrent jobs processors running for given key

  • :ttl (#to_i) — default: 15 minutes

    Concurrency lock TTL in seconds

  • :key_suffix (Object)

    Proc for dynamic key suffix.



27
28
29
30
31
32
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 27

def initialize(strategy_key, opts)
  @base_key   = "#{strategy_key}:concurrency".freeze
  @limit      = opts.fetch(:limit)
  @ttl        = opts.fetch(:ttl, 900).to_i
  @key_suffix = opts[:key_suffix]
end

Instance Method Details

#count(*job_args) ⇒ Integer

Returns Current count of jobs.

Returns:

  • (Integer)

    Current count of jobs



51
52
53
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 51

def count(*job_args)
  Sidekiq.redis { |conn| conn.scard(key(job_args)) }.to_i
end

#dynamic?Boolean

Returns Whenever strategy has dynamic config.

Returns:

  • (Boolean)

    Whenever strategy has dynamic config



41
42
43
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 41

def dynamic?
  @key_suffix || @limit.respond_to?(:call)
end

#finalize!(jid, *job_args) ⇒ void

This method returns an undefined value.

Remove jid from the pool of jobs in progress



63
64
65
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 63

def finalize!(jid, *job_args)
  Sidekiq.redis { |conn| conn.srem(key(job_args), jid.to_s) }
end

#limit(job_args = nil) ⇒ Integer

Returns Amount of allowed concurrent job processors.

Returns:

  • (Integer)

    Amount of allowed concurrent job processors



35
36
37
38
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 35

def limit(job_args = nil)
  return @limit.to_i unless @limit.respond_to? :call
  @limit.call(*job_args).to_i
end

#reset!(*job_args) ⇒ void

This method returns an undefined value.

Resets count of jobs



57
58
59
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 57

def reset!(*job_args)
  Sidekiq.redis { |conn| conn.del(key(job_args)) }.to_i
end

#throttled?(jid, *job_args) ⇒ Boolean

Returns whenever job is throttled or not.

Returns:

  • (Boolean)

    whenever job is throttled or not



46
47
48
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 46

def throttled?(jid, *job_args)
  1 == SCRIPT.eval([key(job_args)], [limit(job_args), @ttl, jid.to_s])
end