Module: Sidekiq::Throttled

Defined in:
lib/sidekiq/throttled.rb,
lib/sidekiq/throttled/job.rb,
lib/sidekiq/throttled/web.rb,
lib/sidekiq/throttled/config.rb,
lib/sidekiq/throttled/errors.rb,
lib/sidekiq/throttled/worker.rb,
lib/sidekiq/throttled/message.rb,
lib/sidekiq/throttled/version.rb,
lib/sidekiq/throttled/cooldown.rb,
lib/sidekiq/throttled/registry.rb,
lib/sidekiq/throttled/strategy.rb,
lib/sidekiq/throttled/web/stats.rb,
lib/sidekiq/throttled/expirable_set.rb,
lib/sidekiq/throttled/strategy/base.rb,
lib/sidekiq/throttled/middlewares/server.rb,
lib/sidekiq/throttled/strategy/threshold.rb,
lib/sidekiq/throttled/patches/basic_fetch.rb,
lib/sidekiq/throttled/patches/super_fetch.rb,
lib/sidekiq/throttled/strategy_collection.rb,
lib/sidekiq/throttled/strategy/concurrency.rb,
lib/sidekiq/throttled/patches/throttled_retriever.rb

Overview

Concurrency and threshold throttling for Sidekiq.

Just add somewhere in your bootstrap:

require "sidekiq/throttled"

Once you’ve done that you can include Job to your job classes and configure throttling:

class MyJob
  include Sidekiq::Job
  include Sidekiq::Throttled::Job

  sidekiq_options :queue => :my_queue

  sidekiq_throttle({
    # Allow maximum 10 concurrent jobs of this class at a time.
    :concurrency => { :limit => 10 },
    # Allow maximum 1K jobs being processed within one hour window.
    :threshold => { :limit => 1_000, :period => 1.hour }
  })

  def perform
    # ...
  end
end

Defined Under Namespace

Modules: Job, Middlewares, Patches, Registry, Web Classes: Config, Cooldown, Error, ExpirableSet, Message, Strategy, StrategyCollection

Constant Summary collapse

Worker =

A new module, Sidekiq::Job, was added in Sidekiq version 6.3.0 as a simple alias for Sidekiq::Worker as the term “worker” was considered too generic and confusing. Many people call a Sidekiq process a “worker” whereas others call the thread that executes jobs a “worker”.

Job
VERSION =

Gem version

"1.5.0"

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.configConfig? (readonly)

Returns:



60
61
62
# File 'lib/sidekiq/throttled.rb', line 60

def config
  @config
end

.cooldownCooldown? (readonly)

Returns:



55
56
57
# File 'lib/sidekiq/throttled.rb', line 55

def cooldown
  @cooldown
end

Class Method Details

.configure {|config| ... } ⇒ Object

Examples:

Sidekiq::Throttled.configure do |config|
  config.cooldown_period = nil # Disable queues cooldown manager
end

Yield Parameters:



68
69
70
71
72
73
74
75
76
77
# File 'lib/sidekiq/throttled.rb', line 68

def configure
  MUTEX.synchronize do
    config = @config.dup

    yield config

    @config   = config.freeze
    @cooldown = Cooldown[@config]
  end
end

.requeue_throttled(work) ⇒ void

This method returns an undefined value.

Return throttled job to be executed later, delegating the details of how to do that to the Strategy for that job.



100
101
102
103
104
105
106
107
# File 'lib/sidekiq/throttled.rb', line 100

def requeue_throttled(work)
  message = JSON.parse(work.job)
  job_class = Object.const_get(message.fetch("wrapped") { message.fetch("class") { return false } })

  Registry.get job_class do |strategy|
    strategy.requeue_throttled(work, **job_class.sidekiq_throttled_requeue_options)
  end
end

.throttled?(message) ⇒ Boolean

Tells whenever job is throttled or not.

Parameters:

  • message (String)

    Job’s JSON payload

Returns:

  • (Boolean)


83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/sidekiq/throttled.rb', line 83

def throttled?(message)
  message = Message.new(message)
  return false unless message.job_class && message.job_id

  Registry.get(message.job_class) do |strategy|
    return strategy.throttled?(message.job_id, *message.job_args)
  end

  false
rescue StandardError
  false
end