Class: Gongren::Rails::Worker

Inherits:
Worker
  • Object
show all
Defined in:
lib/gongren/rails/worker.rb

Instance Method Summary collapse

Instance Method Details

#active_record_model(ar) ⇒ Object

Given a two element Array of [“ClassName”, ID], returns an ActiveRecord instance.



65
66
67
# File 'lib/gongren/rails/worker.rb', line 65

def active_record_model(ar)
  ar.first.constantize.find(ar.last)
end

#startObject

Raises:

  • (ArgumentError)


4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/gongren/rails/worker.rb', line 4

def start
  raise ArgumentError, "#start must be called with a block" unless block_given?

  logger.info { "Gongren::RailsWorker #{worker_id} ready to work" }

  AMQP.start do
    MQ.queue(control_queue_name, control_queue_options).bind(control_exchange_name, control_exchange_options) do |header, data|
      message = Marshal.load(data)
      logger.info { message.inspect }

      if message[:selector].to_s.strip.empty? then
        logger.error { "Received control request without :selector key: ignoring" }
      else
        begin
          send(message[:selector], message)
        rescue Exception => e
          log_failure(header, message, e)
        end
      end
    end

    MQ.queue(queue_name, queue_options).bind(exchange_name, exchange_options).subscribe(:ack => true) do |header, data|
      message = Marshal.load(data)
      class << message; include Unit; end # Dynamically add our #ack method
      message.gongren_header = header

      logger.info { message.inspect }
      receiver = case keys = message[:receiver].keys
                 when [:active_record]
                   active_record_model(message[:receiver][:active_record])
                 when [:class]
                   message[:receiver][:class].constantize
                 else
                   raise UnknownReceiverKeys, "Unable to map from #{keys} to a receiver instance: aborting"
                 end
      message.args.collect! do |arg|
        if arg.kind_of?(Hash) && arg[:active_record] then
          active_record_model(arg[:active_record])
        else
          arg
        end
      end
      logger.debug { "Mapped message is:\n#{message.inspect}" }

      begin
        receiver.send(message[:selector], *message[:args])

        # Automatically ack messages, but do it only once
        logger.debug { "Block ack'd? #{message.acked?}" }
        unless message.acked?
          logger.debug { "Ack'ing for the block" }
          message.ack
        end
      rescue Exception => e
        log_failure(header, message, e)
      end
    end
  end
end