Class: Gongren::Worker
- Inherits:
-
Object
- Object
- Gongren::Worker
- Defined in:
- lib/gongren/worker.rb
Overview
A worker does work. Units of work are received and processed locally. All units of work will be acknowledged back to the server, ensuring units of work are executed exactly once.
Units of work are received as Hashes from the Server. The Hash that is passed to the #run block will be dynamically injected with Unit, which includes the Unit#ack method. If the block returns and the message hasn’t been acknowledged, it will be for you.
Notes on use
If you do any database work, it is important to wrap your work in a transaction, because if your worker dies, the same work unit will be resubmitted.
Defined Under Namespace
Modules: Unit
Class Method Summary collapse
-
.run(options = {}, &block) ⇒ Object
A quick way to run a worker.
Instance Method Summary collapse
-
#initialize(options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #run ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Worker
Returns a new instance of Worker.
37 38 39 40 |
# File 'lib/gongren/worker.rb', line 37 def initialize(={}) @options = .inject(Hash.new) {|memo, (k,v)| memo[k.to_sym] = v; memo} # #symbolize_keys @logger = [:logger] || Logger.new([:log] || STDERR) end |
Class Method Details
.run(options = {}, &block) ⇒ Object
A quick way to run a worker. Creates an instance with the options and runs the block whenever a message is received, passing the exact object that was sent from the server.
88 89 90 |
# File 'lib/gongren/worker.rb', line 88 def self.run(={}, &block) new().run(&block) end |
Instance Method Details
#run ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/gongren/worker.rb', line 42 def run raise ArgumentError, "#run must be called with a block" unless block_given? logger.info { "Gongren::Worker #{worker_id} ready to work" } EM.run do MQ.queue(control_queue_name, ).bind(control_exchange_name, ) do |header, data| = Marshal.load(data) logger.info { .inspect } if [:selector].to_s.strip.empty? then logger.error { "Received control request without :selector key: ignoring" } else begin send([:selector], ) rescue Exception => e log_failure(header, , e) end end end MQ.queue(queue_name, ).bind(exchange_name, ).subscribe do |header, data| = Marshal.load(data) class << ; include Unit; end # Dynamically add our #ack method .gongren_header = header logger.info { .inspect } begin yield # Automatically ack messages, but do it only once logger.debug { "Block ack'd? #{.acked?}" } unless .acked? logger.debug { "Ack'ing for the block" } .ack end rescue Exception => e log_failure(header, , e) end end end end |