Class: Creeper::Fetcher

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Util
Defined in:
lib/creeper/fetch.rb

Overview

The Fetcher blocks on Redis, waiting for a message to process from the queues. It gets the message and hands it to the Manager to assign to a ready Processor.

Constant Summary collapse

TIMEOUT =
1

Constants included from Util

Util::EXPIRY

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util

#beanstalk, #constantize, #logger, #process_id, #redis, #watchdog

Methods included from ExceptionHandler

#handle_exception

Constructor Details

#initialize(mgr, queues, strict) ⇒ Fetcher

Returns a new instance of Fetcher.



15
16
17
18
19
20
# File 'lib/creeper/fetch.rb', line 15

def initialize(mgr, queues, strict)
  @mgr = mgr
  @strictly_ordered_queues = strict
  @queues = queues.map { |q| "queue:#{q}" }
  @unique_queues = @queues.uniq
end

Class Method Details

.done!Object

Ugh. Say hello to a bloody hack. Can’t find a clean way to get the fetcher to just stop processing its mailbox when shutdown starts.



72
73
74
# File 'lib/creeper/fetch.rb', line 72

def self.done!
  @done = true
end

.done?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/creeper/fetch.rb', line 76

def self.done?
  @done
end

Instance Method Details

#fetchObject

Fetching is straightforward: the Manager makes a fetch request for each idle processor when Sidekiq starts and then issues a new fetch request every time a Processor finishes a message.

Because we have to shut down cleanly, we can’t block forever and we can’t loop forever. Instead we reschedule a new fetch if the current fetch turned up nothing.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/creeper/fetch.rb', line 30

def fetch
  watchdog('Fetcher#fetch died') do
    return if Creeper::Fetcher.done?

    begin
      queue = nil
      msg   = nil
      job   = nil
      conn  = nil

      conn = Creeper::BeanstalkConnection.create

      begin
        job = conn.reserve(TIMEOUT)
        queue, msg = Creeper.load_json(job.body)
      rescue Beanstalk::TimedOut
        logger.debug("No message fetched after #{TIMEOUT} seconds") if $DEBUG
        job.release rescue nil
        conn.close rescue nil
        sleep(TIMEOUT)
        return after(0) { fetch }
      end

      if msg
        @mgr.assign!(msg, queue.gsub(/.*queue:/, ''), job, conn)
      else
        after(0) { fetch }
      end
    rescue => ex
      logger.error("Error fetching message: #{ex}")
      logger.error(ex.backtrace.first)
      job.release rescue nil
      conn.close rescue nil
      sleep(TIMEOUT)
      after(0) { fetch }
    end
  end
end