Class: Sidekiq::Throttled::BasicFetch

Inherits:
BasicFetch
  • Object
show all
Defined in:
lib/sidekiq/throttled/basic_fetch.rb

Overview

Throttled version of ‘Sidekiq::BasicFetch` fetcher strategy.

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ BasicFetch

Class constructor



15
16
17
18
19
20
# File 'lib/sidekiq/throttled/basic_fetch.rb', line 15

def initialize(*args)
  @mutex      = Mutex.new
  @suspended  = []

  super(*args)
end

Instance Method Details

#retrieve_workSidekiq::BasicFetch::UnitOfWork?

Returns:

  • (Sidekiq::BasicFetch::UnitOfWork, nil)


23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/sidekiq/throttled/basic_fetch.rb', line 23

def retrieve_work
  work = brpop
  return unless work

  work = ::Sidekiq::BasicFetch::UnitOfWork.new(*work)
  return work unless Throttled.throttled? work.message

  queue = "queue:#{work.queue_name}"

  @mutex.synchronize { @suspended << queue }
  Sidekiq.redis { |conn| conn.lpush(queue, work.message) }

  nil
end