Class: Creeper::Fetcher
- Inherits:
-
Object
- Object
- Creeper::Fetcher
- Includes:
- Celluloid, Util
- Defined in:
- lib/creeper/fetch.rb
Overview
The Fetcher blocks on Redis, waiting for a message to process from the queues. It gets the message and hands it to the Manager to assign to a ready Processor.
Constant Summary collapse
- TIMEOUT =
1
Constants included from Util
Class Method Summary collapse
Instance Method Summary collapse
-
#fetch ⇒ Object
Fetching is straightforward: the Manager makes a fetch request for each idle processor when Sidekiq starts and then issues a new fetch request every time a Processor finishes a message.
-
#initialize(mgr, queues, strict) ⇒ Fetcher
constructor
A new instance of Fetcher.
Methods included from Util
#beanstalk, #constantize, #logger, #process_id, #redis, #watchdog
Methods included from ExceptionHandler
Constructor Details
#initialize(mgr, queues, strict) ⇒ Fetcher
Returns a new instance of Fetcher.
15 16 17 18 19 20 |
# File 'lib/creeper/fetch.rb', line 15 def initialize(mgr, queues, strict) @mgr = mgr @strictly_ordered_queues = strict @queues = queues.map { |q| "queue:#{q}" } @unique_queues = @queues.uniq end |
Class Method Details
.done! ⇒ Object
Ugh. Say hello to a bloody hack. Can’t find a clean way to get the fetcher to just stop processing its mailbox when shutdown starts.
72 73 74 |
# File 'lib/creeper/fetch.rb', line 72 def self.done! @done = true end |
.done? ⇒ Boolean
76 77 78 |
# File 'lib/creeper/fetch.rb', line 76 def self.done? @done end |
Instance Method Details
#fetch ⇒ Object
Fetching is straightforward: the Manager makes a fetch request for each idle processor when Sidekiq starts and then issues a new fetch request every time a Processor finishes a message.
Because we have to shut down cleanly, we can’t block forever and we can’t loop forever. Instead we reschedule a new fetch if the current fetch turned up nothing.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/creeper/fetch.rb', line 30 def fetch watchdog('Fetcher#fetch died') do return if Creeper::Fetcher.done? begin queue = nil msg = nil job = nil conn = nil conn = Creeper::BeanstalkConnection.create begin job = conn.reserve(TIMEOUT) queue, msg = Creeper.load_json(job.body) rescue Beanstalk::TimedOut logger.debug("No message fetched after #{TIMEOUT} seconds") if $DEBUG job.release rescue nil conn.close rescue nil sleep(TIMEOUT) return after(0) { fetch } end if msg @mgr.assign!(msg, queue.gsub(/.*queue:/, ''), job, conn) else after(0) { fetch } end rescue => ex logger.error("Error fetching message: #{ex}") logger.error(ex.backtrace.first) job.release rescue nil conn.close rescue nil sleep(TIMEOUT) after(0) { fetch } end end end |