Class: Sidekiq::Fetcher
- Inherits:
-
Object
- Object
- Sidekiq::Fetcher
- Defined in:
- lib/sidekiq/fetch.rb
Overview
The Fetcher blocks on Redis, waiting for a message to process from the queues. It gets the message and hands it to the Manager to assign to a ready Processor.
Constant Summary collapse
- TIMEOUT =
1
Constants included from Util
Class Method Summary collapse
Instance Method Summary collapse
-
#fetch ⇒ Object
Fetching is straightforward: the Manager makes a fetch request for each idle processor when Sidekiq starts and then issues a new fetch request every time a Processor finishes a message.
- #handle_exception(ex) ⇒ Object
-
#initialize(mgr, options) ⇒ Fetcher
constructor
A new instance of Fetcher.
Methods included from Actor
Methods included from Util
#hostname, #logger, #process_id, #redis, #watchdog
Constructor Details
Class Method Details
.done! ⇒ Object
Ugh. Say hello to a bloody hack. Can’t find a clean way to get the fetcher to just stop processing its mailbox when shutdown starts.
68 69 70 |
# File 'lib/sidekiq/fetch.rb', line 68 def self.done! @done = true end |
.done? ⇒ Boolean
72 73 74 |
# File 'lib/sidekiq/fetch.rb', line 72 def self.done? @done end |
.strategy ⇒ Object
76 77 78 |
# File 'lib/sidekiq/fetch.rb', line 76 def self.strategy Sidekiq.[:fetch] || BasicFetch end |
Instance Method Details
#fetch ⇒ Object
Fetching is straightforward: the Manager makes a fetch request for each idle processor when Sidekiq starts and then issues a new fetch request every time a Processor finishes a message.
Because we have to shut down cleanly, we can’t block forever and we can’t loop forever. Instead we reschedule a new fetch if the current fetch turned up nothing.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/sidekiq/fetch.rb', line 29 def fetch watchdog('Fetcher#fetch died') do return if Sidekiq::Fetcher.done? begin work = @strategy.retrieve_work ::Sidekiq.logger.info("Redis is online, #{Time.now.to_f - @down.to_f} sec downtime") if @down @down = nil if work @mgr.async.assign(work) else after(0) { fetch } end rescue => ex handle_exception(ex) end end end |
#handle_exception(ex) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/sidekiq/fetch.rb', line 50 def handle_exception(ex) if !@down logger.error("Error fetching message: #{ex}") ex.backtrace.each do |bt| logger.error(bt) end end @down ||= Time.now sleep(TIMEOUT) after(0) { fetch } rescue Task::TerminatedError # If redis is down when we try to shut down, all the fetch backlog # raises these errors. Haven't been able to figure out what I'm doing wrong. end |