Class: Resqued::Listener
- Inherits:
-
Object
- Object
- Resqued::Listener
- Includes:
- Logging, ProclineVersion, Sleepy
- Defined in:
- lib/resqued/listener.rb
Overview
A listener process. Watches resque queues and forks workers.
Constant Summary collapse
- SIGNALS =
[:CONT, :QUIT, :INT, :TERM].freeze
- ALL_SIGNALS =
SIGNALS + [:CHLD, :HUP]
- SIGNAL_QUEUE =
rubocop: disable Style/MutableConstant
[]
Instance Attribute Summary collapse
-
#workers ⇒ Object
readonly
Private: all available workers.
Class Method Summary collapse
-
.exec! ⇒ Object
Public: Given args from #exec, start this listener.
Instance Method Summary collapse
-
#burn_down_workers(signal) ⇒ Object
Private: make sure all the workers stop.
-
#check_for_expired_workers ⇒ Object
Private: Check if master reports any dead workers.
-
#exec ⇒ Object
Public: As an alternative to #run, exec a new ruby instance for this listener.
-
#finish_worker(worker_pid, status) ⇒ Object
Private.
-
#info ⇒ Object
Private.
-
#init_workers(config) ⇒ Object
Private.
-
#initialize(options) ⇒ Listener
constructor
Configure a new listener object.
-
#kill_all(signal) ⇒ Object
Private: send a signal to all the workers.
-
#my_workers ⇒ Object
Private: just the workers running as children of this listener.
-
#partition_workers ⇒ Object
Private: Split the workers into [not-running, running].
-
#reap_workers(waitpidflags = 0) ⇒ Object
Private: Check for workers that have stopped running.
-
#report_to_master(status) ⇒ Object
Private: Report child process status.
-
#run ⇒ Object
Public: Run the main loop.
-
#run_workers_run ⇒ Object
Private.
-
#running_workers ⇒ Object
Private: just the running workers.
-
#set_default_resque_logger ⇒ Object
Private.
-
#start_idle_workers ⇒ Object
Private.
-
#write_procline(status) ⇒ Object
Private.
-
#yawn(sleep_time = nil) ⇒ Object
Private.
Methods included from Sleepy
Methods included from ProclineVersion
Methods included from Logging
build_logger, close_log, #log, log_file, log_file=, #log_to_stdout?, logger, logging_io, #reopen_logs
Constructor Details
#initialize(options) ⇒ Listener
Configure a new listener object.
Runs in the master process.
21 22 23 24 25 26 |
# File 'lib/resqued/listener.rb', line 21 def initialize() @config_paths = .fetch(:config_paths) @old_workers = .fetch(:old_workers) { [] }.freeze @socket = .fetch(:socket) @listener_id = .fetch(:listener_id) { nil } end |
Instance Attribute Details
#workers ⇒ Object (readonly)
Private: all available workers
149 150 151 |
# File 'lib/resqued/listener.rb', line 149 def workers @workers end |
Class Method Details
.exec! ⇒ Object
Public: Given args from #exec, start this listener.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/resqued/listener.rb', line 48 def self.exec! = {} if socket = ENV["RESQUED_SOCKET"] [:socket] = Socket.for_fd(socket.to_i) end if path = ENV["RESQUED_CONFIG_PATH"] [:config_paths] = path.split(":") end if state = ENV["RESQUED_STATE"] [:old_workers] = state.split("||").map { |s| Hash[[:pid, :queue_key].zip(s.split("|"))] } end if listener_id = ENV["RESQUED_LISTENER_ID"] [:listener_id] = listener_id end new().run end |
Instance Method Details
#burn_down_workers(signal) ⇒ Object
Private: make sure all the workers stop.
Resque workers have gaps in their signal-handling ability.
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/resqued/listener.rb', line 124 def burn_down_workers(signal) loop do check_for_expired_workers write_procline("shutdown") SIGNAL_QUEUE.clear break if :no_child == reap_workers(Process::WNOHANG) kill_all(signal) sleep 1 # Don't kill any more often than every 1s. yawn 5 end # One last time. reap_workers end |
#check_for_expired_workers ⇒ Object
Private: Check if master reports any dead workers.
192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/resqued/listener.rb', line 192 def check_for_expired_workers return unless @socket loop do IO.select([@socket], nil, nil, 0) or return line = @socket.readline finish_worker(line.to_i, nil) end rescue EOFError, Errno::ECONNRESET => e @socket = nil log "#{e.class.name} while reading from master" Process.kill(:QUIT, $$) end |
#exec ⇒ Object
Public: As an alternative to #run, exec a new ruby instance for this listener.
Runs in the master process.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/resqued/listener.rb', line 31 def exec socket_fd = @socket.to_i ENV["RESQUED_SOCKET"] = socket_fd.to_s ENV["RESQUED_CONFIG_PATH"] = @config_paths.join(":") ENV["RESQUED_STATE"] = @old_workers.map { |r| "#{r[:pid]}|#{r[:queue_key]}" }.join("||") ENV["RESQUED_LISTENER_ID"] = @listener_id.to_s ENV["RESQUED_MASTER_VERSION"] = Resqued::VERSION log "exec: #{Resqued::START_CTX['$0']} listener" exec_opts = { socket_fd => socket_fd } # Ruby 2.0 needs to be told to keep the file descriptor open during exec. if start_pwd = Resqued::START_CTX["pwd"] exec_opts[:chdir] = start_pwd end procline_buf = " " * 256 # make room for setproctitle Kernel.exec(Resqued::START_CTX["$0"], "listener", procline_buf, exec_opts) end |
#finish_worker(worker_pid, status) ⇒ Object
Private.
207 208 209 210 211 212 213 |
# File 'lib/resqued/listener.rb', line 207 def finish_worker(worker_pid, status) workers.each do |worker| if worker.pid == worker_pid worker.finished!(status) end end end |
#info ⇒ Object
Private.
263 264 265 |
# File 'lib/resqued/listener.rb', line 263 def info @info ||= RuntimeInfo.new end |
#init_workers(config) ⇒ Object
Private.
228 229 230 231 232 233 234 235 |
# File 'lib/resqued/listener.rb', line 228 def init_workers(config) @workers = config.build_workers @old_workers.each do |running_worker| if blocked_worker = @workers.detect { |worker| worker.idle? && worker.queue_key == running_worker[:queue_key] } blocked_worker.wait_for(running_worker[:pid].to_i) end end end |
#kill_all(signal) ⇒ Object
Private: send a signal to all the workers.
142 143 144 145 146 |
# File 'lib/resqued/listener.rb', line 142 def kill_all(signal) running = running_workers log "kill -#{signal} #{running.map { |r| r.pid }.inspect}" running.each { |worker| worker.kill(signal) } end |
#my_workers ⇒ Object
Private: just the workers running as children of this listener.
157 158 159 |
# File 'lib/resqued/listener.rb', line 157 def my_workers workers.select { |worker| worker.running_here? } end |
#partition_workers ⇒ Object
Private: Split the workers into [not-running, running]
162 163 164 |
# File 'lib/resqued/listener.rb', line 162 def partition_workers workers.partition { |worker| worker.idle? } end |
#reap_workers(waitpidflags = 0) ⇒ Object
Private: Check for workers that have stopped running
177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/resqued/listener.rb', line 177 def reap_workers(waitpidflags = 0) loop do worker_pid, status = Process.waitpid2(-1, waitpidflags) return :none_ready if worker_pid.nil? log "Worker exited #{status}" finish_worker(worker_pid, status) report_to_master("-#{worker_pid}") end rescue Errno::ECHILD # All done :no_child end |
#report_to_master(status) ⇒ Object
Private: Report child process status.
Examples:
report_to_master("+12345,queue") # Worker process PID:12345 started, working on a job from "queue".
report_to_master("-12345") # Worker process PID:12345 exited.
243 244 245 246 247 248 249 |
# File 'lib/resqued/listener.rb', line 243 def report_to_master(status) @socket&.puts(status) rescue Errno::EPIPE => e @socket = nil log "#{e.class.name} while writing to master" Process.kill(:QUIT, $$) # If the master is gone, LIFE IS NOW MEANINGLESS. end |
#run ⇒ Object
Public: Run the main loop.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/resqued/listener.rb', line 71 def run trap(:HUP) {} # ignore this, in case it trickles in from the master. trap(:CHLD) { awake } SIGNALS.each { |signal| trap(signal) { SIGNAL_QUEUE << signal; awake } } @socket.close_on_exec = true write_procline("starting") config = Resqued::Config.new(@config_paths) set_default_resque_logger config.before_fork(info) report_to_master("RUNNING") write_procline("running") init_workers(config) exit_signal = run_workers_run write_procline("shutdown") burn_down_workers(exit_signal || :QUIT) @socket&.close @socket = nil end |
#run_workers_run ⇒ Object
Private.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/resqued/listener.rb', line 104 def run_workers_run loop do reap_workers(Process::WNOHANG) check_for_expired_workers start_idle_workers write_procline("running") case signal = SIGNAL_QUEUE.shift when nil yawn when :CONT kill_all(signal) when :QUIT, :INT, :TERM return signal end end end |
#running_workers ⇒ Object
Private: just the running workers.
152 153 154 |
# File 'lib/resqued/listener.rb', line 152 def running_workers partition_workers.last end |
#set_default_resque_logger ⇒ Object
Private.
94 95 96 97 98 99 100 101 |
# File 'lib/resqued/listener.rb', line 94 def set_default_resque_logger require "resque" if Resque.respond_to?("logger=") Resque.logger = Resqued::Logging.build_logger end rescue LoadError # rubocop: disable Lint/SuppressedException # Skip this step. end |
#start_idle_workers ⇒ Object
Private.
216 217 218 219 220 221 222 223 224 225 |
# File 'lib/resqued/listener.rb', line 216 def start_idle_workers workers.each do |worker| next unless worker.idle? worker.try_start if pid = worker.pid report_to_master("+#{pid},#{worker.queue_key}") end end end |
#write_procline(status) ⇒ Object
Private.
252 253 254 255 256 257 258 259 260 |
# File 'lib/resqued/listener.rb', line 252 def write_procline(status) procline = "#{procline_version} listener" procline << " \##{@listener_id}" if @listener_id procline << " #{my_workers.size}/#{running_workers.size}/#{workers.size}" if workers procline << " [#{info.app_version}]" if info.app_version procline << " [#{status}]" procline << " #{@config_paths.join(' ')}" $0 = procline end |
#yawn(sleep_time = nil) ⇒ Object
Private.
167 168 169 170 171 172 173 174 |
# File 'lib/resqued/listener.rb', line 167 def yawn(sleep_time = nil) sleep_time ||= begin sleep_times = [60.0] + workers.map { |worker| worker.backing_off_for } [sleep_times.compact.min, 0.0].max end super(sleep_time, @socket) end |