Class: Resque::Forker

Inherits:
Object
  • Object
show all
Defined in:
lib/resque/forker.rb,
lib/resque/forker/version.rb

Overview

Loading Rails, the application and all its dependencies takes significant time and eats up memory. We keep startup time manageable by loading the application once and forking the worker processes. When using REE, this will also keep memory usage low. Note that we can’t reuse connections and file handles in child processes, so no saving on opening database connections, etc.

To use this library, wrap your setup and teardown blocks:

Resque.setup do |forker|
  $:.unshift File.dirname(__FILE__) + "/.."
  require "config/environment"
  ActiveRecord::Base.connection.disconnect!
  forker.logger = Rails.logger
  forker.user "nobody", "nobody"
  forker.workload = ["*"] * 8
  forker.options.interval = 1
end
Resque.before_first_fork do
  ActiveRecord::Base.establish_connection
end

Most libraries cannot share connections between child processes, you want to close these in the parent process (during setup) and reopen connections for each worker when it needs it to process a job (during after_work). This example shows how to do that for ActiveRecord, you will need to do the same for other libraries, e.g. MongoMapper, Vanity.

All the forking action is handled by a single call:

# Three workers, processing all queues
Resque.fork! :workload=>["*"] * 3

The workload is specified as an array of lists of queues, that way you can decide how many workers to fork (length of the array) and give each worker different set of queues to work with. For example, to have four workers processing import queue, and only two of these also processing export queue:

forker.workload = ["import,export", "import"] * 2

Once the process is up and running, you control it by sending signals:

  • kill -QUIT – Quit gracefully

  • kill -TERM – Terminate immediately

  • kill -USR2 – Suspend all workers (e.g when running rake db:migrate)

  • kill -CONT – Resume suspended workers

  • kill -HUP – Shutdown and restart

The HUP signal will wait for all existing jobs to complete, run the teardown block, and reload the script with the same environment and arguments. It will reload the application, the libraries, and of course any configuration changes in this script (e.g. changes to the workload).

The reloaded process keeps the same PID so you can use it with upstart:

reload workers

The upstart script could look something like this:

start on runlevel [2345]
stop on runlevel [06]
chdir /var/app/current
env RAILS_ENV=production
exec script/workers
respawn

Defined Under Namespace

Classes: Options

Constant Summary collapse

VERSION =
"2.1.0"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = nil) ⇒ Forker

Returns a new instance of Forker.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/resque/forker.rb', line 69

def initialize(options = nil)
  options ||= {}
  @logger = options[:logger] || Logger.new($stderr)
  @workload = options[:workload] || ["*"]
  @options = Options[*options.values_at(*Options.members)]
  @children = []
  begin
    require "system_timer"
    @timeout = SystemTimer.method(:timeout_after)
  rescue NameError, LoadError
    require "timeout"
    @timeout = method(:timeout)
  end
end

Instance Attribute Details

#loggerObject

Defaults to stderr, but you may want to point this at Rails logger.



89
90
91
# File 'lib/resque/forker.rb', line 89

def logger
  @logger
end

#optionsObject (readonly)

Most options can be changed from the Resque.setup hook. See Options.



92
93
94
# File 'lib/resque/forker.rb', line 92

def options
  @options
end

#workloadObject

Workload is an array of queue sets, one entry per workers (so four entries if you want four workers). Each entry is comma-separated queue names.



86
87
88
# File 'lib/resque/forker.rb', line 86

def workload
  @workload
end

Instance Method Details

#runObject

Run and never return.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/resque/forker.rb', line 95

def run
  @logger.info "** Running as #{Process.pid}"
  setup_signals
  if setup = Resque.setup
    @logger.info "** Loading application ..."
    setup.call self
  end
  reap_children
  @logger.info "** Forking workers"
  enable_gc_optimizations
  # Serious forking action.
  @workload.each do |queues|
    @children << fork { run_worker queues }
  end
rescue
  @logger.error "** Failed to load application: #{$!.message}"
  @logger.error $!.backtrace.join("\n")
ensure
  # Sleep forever.
  sleep 5 while true
end

#user(user, group) ⇒ Object

Change ownership of this process.



118
119
120
121
122
123
124
125
126
# File 'lib/resque/forker.rb', line 118

def user(user, group)
  uid = Etc.getpwnam(user).uid
  gid = Etc.getgrnam(group).gid
  if Process.euid != uid || Process.egid != gid
    Process.initgroups user, gid
    Process::GID.change_privilege gid
    Process::UID.change_privilege uid
  end
end