Class: Sidekiq::Fetcher

Inherits:
Object
  • Object
show all
Includes:
Actor, Util
Defined in:
lib/sidekiq/fetch.rb

Overview

The Fetcher blocks on Redis, waiting for a message to process from the queues. It gets the message and hands it to the Manager to assign to a ready Processor.

Constant Summary collapse

TIMEOUT =
1

Constants included from Util

Util::EXPIRY

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Actor

included

Methods included from Util

#hostname, #logger, #process_id, #redis, #watchdog

Constructor Details

#initialize(mgr, options) ⇒ Fetcher

Returns a new instance of Fetcher.



15
16
17
18
19
# File 'lib/sidekiq/fetch.rb', line 15

def initialize(mgr, options)
  @down = nil
  @mgr = mgr
  @strategy = Fetcher.strategy.new(options)
end

Class Method Details

.done!Object

Ugh. Say hello to a bloody hack. Can’t find a clean way to get the fetcher to just stop processing its mailbox when shutdown starts.



68
69
70
# File 'lib/sidekiq/fetch.rb', line 68

def self.done!
  @done = true
end

.done?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/sidekiq/fetch.rb', line 72

def self.done?
  @done
end

.strategyObject



76
77
78
# File 'lib/sidekiq/fetch.rb', line 76

def self.strategy
  Sidekiq.options[:fetch] || BasicFetch
end

Instance Method Details

#fetchObject

Fetching is straightforward: the Manager makes a fetch request for each idle processor when Sidekiq starts and then issues a new fetch request every time a Processor finishes a message.

Because we have to shut down cleanly, we can’t block forever and we can’t loop forever. Instead we reschedule a new fetch if the current fetch turned up nothing.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/sidekiq/fetch.rb', line 29

def fetch
  watchdog('Fetcher#fetch died') do
    return if Sidekiq::Fetcher.done?

    begin
      work = @strategy.retrieve_work
      ::Sidekiq.logger.info("Redis is online, #{Time.now.to_f - @down.to_f} sec downtime") if @down
      @down = nil

      if work
        @mgr.async.assign(work)
      else
        after(0) { fetch }
      end
    rescue => ex
      handle_exception(ex)
    end

  end
end

#handle_exception(ex) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/sidekiq/fetch.rb', line 50

def handle_exception(ex)
  if !@down
    logger.error("Error fetching message: #{ex}")
    ex.backtrace.each do |bt|
      logger.error(bt)
    end
  end
  @down ||= Time.now
  sleep(TIMEOUT)
  after(0) { fetch }
rescue Task::TerminatedError
  # If redis is down when we try to shut down, all the fetch backlog
  # raises these errors.  Haven't been able to figure out what I'm doing wrong.
end