Class: Sidekiq::Fetcher
- Inherits:
-
Object
- Object
- Sidekiq::Fetcher
- Includes:
- Celluloid, Util
- Defined in:
- lib/sidekiq/fetch.rb
Overview
The Fetcher blocks on Redis, waiting for a message to process from the queues. It gets the message and hands it to the Manager to assign to a ready Processor.
Constant Summary collapse
- TIMEOUT =
1
Constants included from Util
Class Method Summary collapse
Instance Method Summary collapse
-
#fetch ⇒ Object
Fetching is straightforward: the Manager makes a fetch request for each idle processor when Sidekiq starts and then issues a new fetch request every time a Processor finishes a message.
-
#initialize(mgr, queues) ⇒ Fetcher
constructor
A new instance of Fetcher.
Methods included from Util
#constantize, #logger, #process_id, #redis, #watchdog
Constructor Details
#initialize(mgr, queues) ⇒ Fetcher
Returns a new instance of Fetcher.
15 16 17 18 19 |
# File 'lib/sidekiq/fetch.rb', line 15 def initialize(mgr, queues) @mgr = mgr @queues = queues.map { |q| "queue:#{q}" } @unique_queues = @queues.uniq end |
Class Method Details
.done! ⇒ Object
Ugh. Say hello to a bloody hack. Can’t find a clean way to get the fetcher to just stop processing its mailbox when shutdown starts.
55 56 57 |
# File 'lib/sidekiq/fetch.rb', line 55 def self.done! @done = true end |
.done? ⇒ Boolean
59 60 61 |
# File 'lib/sidekiq/fetch.rb', line 59 def self.done? @done end |
Instance Method Details
#fetch ⇒ Object
Fetching is straightforward: the Manager makes a fetch request for each idle processor when Sidekiq starts and then issues a new fetch request every time a Processor finishes a message.
Because we have to shut down cleanly, we can’t block forever and we can’t loop forever. Instead we reschedule a new fetch if the current fetch turned up nothing.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/sidekiq/fetch.rb', line 29 def fetch watchdog('Fetcher#fetch died') do return if Sidekiq::Fetcher.done? begin queue = nil msg = nil Sidekiq.redis { |conn| queue, msg = conn.blpop(*queues_cmd) } if msg @mgr.assign!(msg, queue.gsub(/.*queue:/, '')) else after(0) { fetch } end rescue => ex logger.error("Error fetching message: #{ex}") logger.error(ex.backtrace.first) sleep(TIMEOUT) after(0) { fetch } end end end |