Class: Sidekiq::Throttled::Strategy
- Inherits:
-
Object
- Object
- Sidekiq::Throttled::Strategy
- Defined in:
- lib/sidekiq/throttled/strategy.rb,
lib/sidekiq/throttled/strategy/base.rb,
lib/sidekiq/throttled/strategy/threshold.rb,
lib/sidekiq/throttled/strategy/concurrency.rb
Overview
Meta-strategy that couples Concurrency and Threshold strategies.
Defined Under Namespace
Modules: Base Classes: Concurrency, Threshold
Instance Attribute Summary collapse
-
#concurrency ⇒ Object
readonly
Returns the value of attribute concurrency.
-
#observer ⇒ Object
readonly
Returns the value of attribute observer.
-
#threshold ⇒ Object
readonly
Returns the value of attribute threshold.
Instance Method Summary collapse
-
#dynamic? ⇒ Boolean
Whenever strategy has dynamic config.
-
#finalize!(jid, *job_args) ⇒ void
Marks job as being processed.
-
#initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil) ⇒ Strategy
constructor
A new instance of Strategy.
-
#requeue_throttled(work, with:, to: nil) ⇒ void
Return throttled job to be executed later.
-
#reset! ⇒ void
Resets count of jobs of all available strategies.
-
#throttled?(jid, *job_args) ⇒ Boolean
Whenever job is throttled or not.
Constructor Details
#initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil) ⇒ Strategy
Returns a new instance of Strategy.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/sidekiq/throttled/strategy.rb', line 34 def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil) @observer = observer @concurrency = StrategyCollection.new(concurrency, strategy: Concurrency, name: name, key_suffix: key_suffix) @threshold = StrategyCollection.new(threshold, strategy: Threshold, name: name, key_suffix: key_suffix) raise ArgumentError, "Neither :concurrency nor :threshold given" unless @concurrency.any? || @threshold.any? end |
Instance Attribute Details
#concurrency ⇒ Object (readonly)
Returns the value of attribute concurrency.
17 18 19 |
# File 'lib/sidekiq/throttled/strategy.rb', line 17 def concurrency @concurrency end |
#observer ⇒ Object (readonly)
Returns the value of attribute observer.
25 26 27 |
# File 'lib/sidekiq/throttled/strategy.rb', line 25 def observer @observer end |
#threshold ⇒ Object (readonly)
Returns the value of attribute threshold.
21 22 23 |
# File 'lib/sidekiq/throttled/strategy.rb', line 21 def threshold @threshold end |
Instance Method Details
#dynamic? ⇒ Boolean
Returns whenever strategy has dynamic config.
51 52 53 54 55 56 |
# File 'lib/sidekiq/throttled/strategy.rb', line 51 def dynamic? return true if @concurrency&.dynamic? return true if @threshold&.dynamic? false end |
#finalize!(jid, *job_args) ⇒ void
This method returns an undefined value.
Marks job as being processed.
102 103 104 |
# File 'lib/sidekiq/throttled/strategy.rb', line 102 def finalize!(jid, *job_args) @concurrency&.finalize!(jid, *job_args) end |
#requeue_throttled(work, with:, to: nil) ⇒ void
This method returns an undefined value.
Return throttled job to be executed later. Implementation depends on the value of ‘with`: :enqueue means put the job back at the end of the queue immediately :schedule means schedule enqueueing the job for a later time when we expect to have capacity
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/sidekiq/throttled/strategy.rb', line 83 def requeue_throttled(work, with:, to: nil) # rubocop:disable Metrics/MethodLength # Resolve :with and :to arguments, calling them if they are Procs job_args = JSON.parse(work.job)["args"] requeue_with = with.respond_to?(:call) ? with.call(*job_args) : with target_queue = calc_target_queue(work, to) case requeue_with when :enqueue re_enqueue_throttled(work, target_queue) when :schedule # Find out when we will next be able to execute this job, and reschedule for then. reschedule_throttled(work, requeue_to: target_queue) else raise "unrecognized :with option #{with}" end end |
#reset! ⇒ void
This method returns an undefined value.
Resets count of jobs of all available strategies
108 109 110 111 |
# File 'lib/sidekiq/throttled/strategy.rb', line 108 def reset! @concurrency&.reset! @threshold&.reset! end |
#throttled?(jid, *job_args) ⇒ Boolean
Returns whenever job is throttled or not.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/sidekiq/throttled/strategy.rb', line 59 def throttled?(jid, *job_args) if @concurrency&.throttled?(jid, *job_args) @observer&.call(:concurrency, *job_args) return true end if @threshold&.throttled?(*job_args) @observer&.call(:threshold, *job_args) finalize!(jid, *job_args) return true end false end |