Class: EventMachine::Resque::WorkerMachine

Inherits:
Object
  • Object
show all
Defined in:
lib/em-resque/worker_machine.rb

Overview

WorkerMachine is an EventMachine with Resque workers wrapped in Ruby fibers.

An instance contains the workers and a system monitor running inside an EventMachine. The monitoring takes care of stopping the machine when all workers are shut down.

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ WorkerMachine

Initializes the machine, creates the fibers and workers, traps quit signals and prunes dead workers

Options

fibers

The number of fibers to use in the worker (default 1)

interval

Time in seconds how often the workers check for new work (default 5)

queues

Which queues to poll (default all)

verbose

Verbose log output (default false)

vverbose

Even more verbose log output (default false)

pidfile

The file to save the process id number

tick_instead_of_sleep

Whether to tick through the reactor polling for jobs or use EM::Synchrony.sleep. Note that if you use this option, you’ll be limited to 1 fiber.

Raises:

  • (ArgumentError)


28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/em-resque/worker_machine.rb', line 28

def initialize(opts = {})
  @interval = opts[:interval] || 5
  @fibers_count = opts[:fibers] || 1
  @queues = opts[:queue] || opts[:queues] || '*'
  @verbose = opts[:logging] || opts[:verbose] || false
  @very_verbose = opts[:vverbose] || false
  @pidfile = opts[:pidfile]
  @redis_namespace = opts[:namespace] || :resque
  @redis_uri = opts[:redis] || "redis://127.0.0.1:6379"
  @tick_instead_of_sleep = !opts[:tick_instead_of_sleep].nil? ? opts[:tick_instead_of_sleep] : false

  # If we're ticking instead of sleeping, we can only have one fiber
  if @tick_instead_of_sleep
    @fibers_count = 1
  end

  raise(ArgumentError, "Should have at least one fiber") if @fibers_count.to_i < 1

  build_workers
  build_fibers
  create_pidfile
end

Instance Method Details

#fibersObject



71
72
73
# File 'lib/em-resque/worker_machine.rb', line 71

def fibers
  @fibers || []
end

#startObject

Start the machine and start polling queues.



52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/em-resque/worker_machine.rb', line 52

def start
  EM.synchrony do
    EM::Resque.initialize_redis(@redis_uri, @redis_namespace, @fibers_count)
    trap_signals
    prune_dead_workers
    @fibers.each(&:resume)

    # If we're ticking and not sleeping, we don't need to monitor for yielding
    unless @tick_instead_of_sleep
      system_monitor.resume
    end
  end
end

#stopObject

Stop the machine.



67
68
69
# File 'lib/em-resque/worker_machine.rb', line 67

def stop
  @workers.each(&:shutdown)
end

#workersObject



75
76
77
# File 'lib/em-resque/worker_machine.rb', line 75

def workers
  @workers || []
end