Class: Sidekiq::Throttled::Strategy

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/throttled/strategy.rb,
lib/sidekiq/throttled/strategy/base.rb,
lib/sidekiq/throttled/strategy/threshold.rb,
lib/sidekiq/throttled/strategy/concurrency.rb

Overview

Meta-strategy that couples Concurrency and Threshold strategies.

Defined Under Namespace

Modules: Base Classes: Concurrency, Threshold

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil) ⇒ Strategy

Returns a new instance of Strategy.

Parameters:

Raises:

  • (ArgumentError)


34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/sidekiq/throttled/strategy.rb', line 34

def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil)
  @observer = observer

  @concurrency = StrategyCollection.new(concurrency,
    strategy:   Concurrency,
    name:       name,
    key_suffix: key_suffix)

  @threshold = StrategyCollection.new(threshold,
    strategy:   Threshold,
    name:       name,
    key_suffix: key_suffix)

  raise ArgumentError, "Neither :concurrency nor :threshold given" unless @concurrency.any? || @threshold.any?
end

Instance Attribute Details

#concurrencyObject (readonly)

Returns the value of attribute concurrency.



17
18
19
# File 'lib/sidekiq/throttled/strategy.rb', line 17

def concurrency
  @concurrency
end

#observerObject (readonly)

Returns the value of attribute observer.



25
26
27
# File 'lib/sidekiq/throttled/strategy.rb', line 25

def observer
  @observer
end

#thresholdObject (readonly)

Returns the value of attribute threshold.



21
22
23
# File 'lib/sidekiq/throttled/strategy.rb', line 21

def threshold
  @threshold
end

Instance Method Details

#dynamic?Boolean

Returns whenever strategy has dynamic config.

Returns:

  • (Boolean)

    whenever strategy has dynamic config



51
52
53
54
55
56
# File 'lib/sidekiq/throttled/strategy.rb', line 51

def dynamic?
  return true if @concurrency&.dynamic?
  return true if @threshold&.dynamic?

  false
end

#finalize!(jid, *job_args) ⇒ void

This method returns an undefined value.

Marks job as being processed.



102
103
104
# File 'lib/sidekiq/throttled/strategy.rb', line 102

def finalize!(jid, *job_args)
  @concurrency&.finalize!(jid, *job_args)
end

#requeue_throttled(work, with:, to: nil) ⇒ void

This method returns an undefined value.

Return throttled job to be executed later. Implementation depends on the value of ‘with`: :enqueue means put the job back at the end of the queue immediately :schedule means schedule enqueueing the job for a later time when we expect to have capacity

Parameters:

  • with (#to_s, #call)

    How to handle the throttled job

  • to (#to_s, #call) (defaults to: nil)

    Name of the queue to re-queue the job to. If not specified, will use the job’s original queue.



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/sidekiq/throttled/strategy.rb', line 83

def requeue_throttled(work, with:, to: nil) # rubocop:disable Metrics/MethodLength
  # Resolve :with and :to arguments, calling them if they are Procs
  job_args = JSON.parse(work.job)["args"]
  requeue_with = with.respond_to?(:call) ? with.call(*job_args) : with
  target_queue = calc_target_queue(work, to)

  case requeue_with
  when :enqueue
    re_enqueue_throttled(work, target_queue)
  when :schedule
    # Find out when we will next be able to execute this job, and reschedule for then.
    reschedule_throttled(work, requeue_to: target_queue)
  else
    raise "unrecognized :with option #{with}"
  end
end

#reset!void

This method returns an undefined value.

Resets count of jobs of all available strategies



108
109
110
111
# File 'lib/sidekiq/throttled/strategy.rb', line 108

def reset!
  @concurrency&.reset!
  @threshold&.reset!
end

#throttled?(jid, *job_args) ⇒ Boolean

Returns whenever job is throttled or not.

Returns:

  • (Boolean)

    whenever job is throttled or not.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/sidekiq/throttled/strategy.rb', line 59

def throttled?(jid, *job_args)
  if @concurrency&.throttled?(jid, *job_args)
    @observer&.call(:concurrency, *job_args)
    return true
  end

  if @threshold&.throttled?(*job_args)
    @observer&.call(:threshold, *job_args)

    finalize!(jid, *job_args)
    return true
  end

  false
end