Class: Gongren::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/gongren/worker.rb

Overview

A worker does work. Units of work are received and processed locally. All units of work will be acknowledged back to the server, ensuring units of work are executed exactly once.

Units of work are received as Hashes from the Server. The Hash that is passed to the #run block will be dynamically injected with Unit, which includes the Unit#ack method. If the block returns and the message hasn’t been acknowledged, it will be for you.

Notes on use

If you do any database work, it is important to wrap your work in a transaction, because if your worker dies, the same work unit will be resubmitted.

Examples:


# In a Rails context, this would live in script/gongren-worker:
require File.dirname(__FILE__) + "/../config/environment"
require "gongren/worker"

Gengren::Worker.run do |unit|
  ActiveRecord::Base.transaction do
    klass_name = unit[:class_name]
    klass      = klass_name.constantize
    instance   = klass.find(unit[:id])
    results    = instance.send(unit[:selector], *unit[:args])
  end

  # We don't have two phase commit yet, this acknowledging outside the
  # transaction might execute a message twice.
  unit.ack
end

Defined Under Namespace

Modules: Unit

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Worker

Returns a new instance of Worker.



37
38
39
40
# File 'lib/gongren/worker.rb', line 37

def initialize(options={})
  @options = options.inject(Hash.new) {|memo, (k,v)| memo[k.to_sym] = v; memo} # #symbolize_keys
  @logger  = options[:logger] || Logger.new(options[:log] || STDERR)
end

Class Method Details

.run(options = {}, &block) ⇒ Object

A quick way to run a worker. Creates an instance with the options and runs the block whenever a message is received, passing the exact object that was sent from the server.



88
89
90
# File 'lib/gongren/worker.rb', line 88

def self.run(options={}, &block)
  new(options).run(&block)
end

Instance Method Details

#runObject

Raises:

  • (ArgumentError)


42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/gongren/worker.rb', line 42

def run
  raise ArgumentError, "#run must be called with a block" unless block_given?

  logger.info { "Gongren::Worker #{worker_id} ready to work" }

  EM.run do
    MQ.queue(control_queue_name, control_queue_options).bind(control_exchange_name, control_exchange_options) do |header, data|
      message = Marshal.load(data)
      logger.info { message.inspect }

      if message[:selector].to_s.strip.empty? then
        logger.error { "Received control request without :selector key: ignoring" }
      else
        begin
          send(message[:selector], message)
        rescue Exception => e
          log_failure(header, message, e)
        end
      end
    end

    MQ.queue(queue_name, queue_options).bind(exchange_name, exchange_options).subscribe do |header, data|
      message = Marshal.load(data)
      class << message; include Unit; end # Dynamically add our #ack method
      message.gongren_header = header

      logger.info { message.inspect }

      begin
        yield message

        # Automatically ack messages, but do it only once
        logger.debug { "Block ack'd? #{message.acked?}" }
        unless message.acked?
          logger.debug { "Ack'ing for the block" }
          message.ack
        end
      rescue Exception => e
        log_failure(header, message, e)
      end
    end
  end
end