Class: Sidekiq::Ultimate::Fetch
- Inherits:
-
Object
- Object
- Sidekiq::Ultimate::Fetch
- Defined in:
- lib/sidekiq/ultimate/fetch.rb
Overview
Throttled reliable fetcher implementing reliable queue pattern.
Constant Summary collapse
- TIMEOUT =
Delay between fetch retries in case of no job received.
2
Class Method Summary collapse
Instance Method Summary collapse
-
#bulk_requeue(units, _options) ⇒ Object
TODO: Requeue in batch or at least using pipeline.
-
#initialize(options) ⇒ Fetch
constructor
A new instance of Fetch.
-
#retrieve_work ⇒ UnitOfWork
If work can be processed.
Constructor Details
#initialize(options) ⇒ Fetch
Returns a new instance of Fetch.
20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/sidekiq/ultimate/fetch.rb', line 20 def initialize() @exhausted_by_throttling = ExpirableSet.new @empty_queues = Sidekiq::Ultimate::EmptyQueues.instance @strict = [:strict] ? true : false @queues = [:queues] @queues.uniq! if @strict @paused_queues = [] @paused_queues_expires_at = 0 end |
Class Method Details
.setup! ⇒ Object
54 55 56 57 58 59 60 |
# File 'lib/sidekiq/ultimate/fetch.rb', line 54 def self.setup! fetcher = new(Sidekiq) Sidekiq[:fetch] = fetcher Resurrector.setup! EmptyQueues.setup! end |
Instance Method Details
#bulk_requeue(units, _options) ⇒ Object
TODO: Requeue in batch or at least using pipeline
50 51 52 |
# File 'lib/sidekiq/ultimate/fetch.rb', line 50 def bulk_requeue(units, ) units.each(&:requeue) end |
#retrieve_work ⇒ UnitOfWork
Returns if work can be processed.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/sidekiq/ultimate/fetch.rb', line 33 def retrieve_work work = retrieve if work&.throttled? work.requeue_throttled @exhausted_by_throttling.add( work.queue_name, :ttl => Sidekiq::Ultimate::Configuration.instance.throttled_fetch_timeout_sec ) return nil end work end |