Class: Pace::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/pace/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Worker

Returns a new instance of Worker.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/pace/worker.rb', line 5

def initialize(options = {})
  @options   = options.dup
  @queue     = @options.delete(:queue) || ENV["PACE_QUEUE"]
  @namespace = @options.delete(:namespace)

  if @queue.nil? || @queue.empty?
    raise ArgumentError.new("Queue unspecified -- pass a queue name or set PACE_QUEUE")
  end

  @queue = fully_qualified_queue(@queue)

  url = URI(@options.delete(:url) || ENV["PACE_REDIS"] || "redis://127.0.0.1:6379/0")

  @options[:host]     ||= url.host
  @options[:port]     ||= url.port
  @options[:password] ||= url.password
  @options[:db]       ||= url.path[1..-1].to_i
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



3
4
5
# File 'lib/pace/worker.rb', line 3

def queue
  @queue
end

#redisObject (readonly)

Returns the value of attribute redis.



3
4
5
# File 'lib/pace/worker.rb', line 3

def redis
  @redis
end

Instance Method Details

#enqueue(queue, klass, *args, &block) ⇒ Object



50
51
52
53
54
# File 'lib/pace/worker.rb', line 50

def enqueue(queue, klass, *args, &block)
  queue = fully_qualified_queue(queue)
  job   = {:class => klass.to_s, :args => args}.to_json
  @redis.rpush(queue, job, &block)
end

#on_error(&callback) ⇒ Object



46
47
48
# File 'lib/pace/worker.rb', line 46

def on_error(&callback)
  @error_callback = callback
end

#shutdownObject



41
42
43
44
# File 'lib/pace/worker.rb', line 41

def shutdown
  Pace.logger.info "Shutting down"
  EM.stop_event_loop
end

#start(&block) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/pace/worker.rb', line 24

def start(&block)
  @block = block

  Pace.logger.info "Starting up"
  register_signal_handlers

  EM.run do
    EventMachine::add_periodic_timer(Pace::LoadAverage::INTERVAL) do
      Pace::LoadAverage.compute
      Pace.logger.info("load averages: #{$load.join(' ')}")
    end

    @redis = EM::Protocols::Redis.connect(@options)
    fetch_next_job
  end
end