Class: Woodhouse::Runners::BunnyRunner

Inherits:
Woodhouse::Runner show all
Includes:
Celluloid
Defined in:
lib/woodhouse/runners/bunny_runner.rb

Instance Method Summary collapse

Methods inherited from Woodhouse::Runner

#current_status, #initialize

Constructor Details

This class inherits a constructor from Woodhouse::Runner

Instance Method Details

#bail_out(err) ⇒ Object

Raises:



44
45
46
# File 'lib/woodhouse/runners/bunny_runner.rb', line 44

def bail_out(err)
  raise Woodhouse::BailOut, "#{err.class}: #{err.message}"
end

#make_job(properties, payload) ⇒ Object



52
53
54
55
56
57
58
# File 'lib/woodhouse/runners/bunny_runner.rb', line 52

def make_job(properties, payload)
  Woodhouse::Job.new(@worker.worker_class_name, @worker.job_method) do |job|
    args = properties.headers
    job.arguments = args
    job.payload = payload
  end
end

#spin_downObject



48
49
50
# File 'lib/woodhouse/runners/bunny_runner.rb', line 48

def spin_down
  signal :spin_down
end

#subscribeObject



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/woodhouse/runners/bunny_runner.rb', line 6

def subscribe
  bunny = Bunny.new(@config.server_info)
  bunny.start
  channel = bunny.create_channel
  channel.prefetch(1)
  queue = channel.queue(@worker.queue_name)
  exchange = channel.exchange(@worker.exchange_name, :type => :headers)
  queue.bind(exchange, :arguments => @worker.criteria.amqp_headers)
  worker = Celluloid.current_actor
  queue.subscribe(:ack => true, :block => false) do |delivery, props, payload|
    begin
      job = make_job(props, payload)
      if can_service_job?(job)
        if service_job(job)
          channel.acknowledge(delivery.delivery_tag, false)
        else
          channel.reject(delivery.delivery_tag, false)
        end
      else
        @config.logger.error("Cannot service job #{job.describe} in queue for #{@worker.describe}")
        channel.reject(delivery.delivery_tag, false) 
      end
    rescue => err
      begin
        @config.logger.error("Error bubbled up out of worker. This shouldn't happen. #{err.message}")
        err.backtrace.each do |btr|
          @config.logger.error("  #{btr}")
        end
        # Don't risk grabbing this job again. 
        channel.reject(delivery.delivery_tag, false)
      ensure
        worker.bail_out(err)
      end
    end
  end
  wait :spin_down
end