Class: Pace::Worker
- Inherits:
-
Object
- Object
- Pace::Worker
- Defined in:
- lib/pace/worker.rb
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
Instance Method Summary collapse
- #enqueue(queue, klass, *args, &block) ⇒ Object
-
#initialize(options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #on_error(&callback) ⇒ Object
- #shutdown ⇒ Object
- #start(&block) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Worker
Returns a new instance of Worker.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/pace/worker.rb', line 5 def initialize( = {}) @options = .dup @queue = @options.delete(:queue) || ENV["PACE_QUEUE"] @namespace = @options.delete(:namespace) if @queue.nil? || @queue.empty? raise ArgumentError.new("Queue unspecified -- pass a queue name or set PACE_QUEUE") end @queue = fully_qualified_queue(@queue) url = URI(@options.delete(:url) || ENV["PACE_REDIS"] || "redis://127.0.0.1:6379/0") @options[:host] ||= url.host @options[:port] ||= url.port @options[:password] ||= url.password @options[:db] ||= url.path[1..-1].to_i end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
3 4 5 |
# File 'lib/pace/worker.rb', line 3 def queue @queue end |
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
3 4 5 |
# File 'lib/pace/worker.rb', line 3 def redis @redis end |
Instance Method Details
#enqueue(queue, klass, *args, &block) ⇒ Object
50 51 52 53 54 |
# File 'lib/pace/worker.rb', line 50 def enqueue(queue, klass, *args, &block) queue = fully_qualified_queue(queue) job = {:class => klass.to_s, :args => args}.to_json @redis.rpush(queue, job, &block) end |
#on_error(&callback) ⇒ Object
46 47 48 |
# File 'lib/pace/worker.rb', line 46 def on_error(&callback) @error_callback = callback end |
#shutdown ⇒ Object
41 42 43 44 |
# File 'lib/pace/worker.rb', line 41 def shutdown Pace.logger.info "Shutting down" EM.stop_event_loop end |
#start(&block) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/pace/worker.rb', line 24 def start(&block) @block = block Pace.logger.info "Starting up" register_signal_handlers EM.run do EventMachine::add_periodic_timer(Pace::LoadAverage::INTERVAL) do Pace::LoadAverage.compute Pace.logger.info("load averages: #{$load.join(' ')}") end @redis = EM::Protocols::Redis.connect(@options) fetch_next_job end end |