Class: Resqued::Listener

Inherits:
Object
  • Object
show all
Includes:
Logging, ProclineVersion, Sleepy
Defined in:
lib/resqued/listener.rb

Overview

A listener process. Watches resque queues and forks workers.

Constant Summary collapse

SIGNALS =
[:CONT, :QUIT, :INT, :TERM].freeze
ALL_SIGNALS =
SIGNALS + [:CHLD, :HUP]
SIGNAL_QUEUE =

rubocop: disable Style/MutableConstant

[]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Sleepy

#awake, #self_pipe

Methods included from ProclineVersion

#procline_version

Methods included from Logging

build_logger, close_log, #log, log_file, log_file=, #log_to_stdout?, logger, logging_io, #reopen_logs

Constructor Details

#initialize(options) ⇒ Listener

Configure a new listener object.

Runs in the master process.



21
22
23
24
25
26
# File 'lib/resqued/listener.rb', line 21

def initialize(options)
  @config_paths    = options.fetch(:config_paths)
  @old_workers     = options.fetch(:old_workers) { [] }.freeze
  @socket          = options.fetch(:socket)
  @listener_id     = options.fetch(:listener_id) { nil }
end

Instance Attribute Details

#workersObject (readonly)

Private: all available workers



149
150
151
# File 'lib/resqued/listener.rb', line 149

def workers
  @workers
end

Class Method Details

.exec!Object

Public: Given args from #exec, start this listener.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/resqued/listener.rb', line 48

def self.exec!
  options = {}
  if socket = ENV["RESQUED_SOCKET"]
    options[:socket] = Socket.for_fd(socket.to_i)
  end
  if path = ENV["RESQUED_CONFIG_PATH"]
    options[:config_paths] = path.split(":")
  end
  if state = ENV["RESQUED_STATE"]
    options[:old_workers] = state.split("||").map { |s| Hash[[:pid, :queue_key].zip(s.split("|"))] }
  end
  if listener_id = ENV["RESQUED_LISTENER_ID"]
    options[:listener_id] = listener_id
  end
  new(options).run
end

Instance Method Details

#burn_down_workers(signal) ⇒ Object

Private: make sure all the workers stop.

Resque workers have gaps in their signal-handling ability.



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/resqued/listener.rb', line 124

def burn_down_workers(signal)
  loop do
    check_for_expired_workers
    write_procline("shutdown")
    SIGNAL_QUEUE.clear

    break if :no_child == reap_workers(Process::WNOHANG)

    kill_all(signal)

    sleep 1 # Don't kill any more often than every 1s.
    yawn 5
  end
  # One last time.
  reap_workers
end

#check_for_expired_workersObject

Private: Check if master reports any dead workers.



192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/resqued/listener.rb', line 192

def check_for_expired_workers
  return unless @socket

  loop do
    IO.select([@socket], nil, nil, 0) or return
    line = @socket.readline
    finish_worker(line.to_i, nil)
  end
rescue EOFError, Errno::ECONNRESET => e
  @socket = nil
  log "#{e.class.name} while reading from master"
  Process.kill(:QUIT, $$)
end

#execObject

Public: As an alternative to #run, exec a new ruby instance for this listener.

Runs in the master process.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/resqued/listener.rb', line 31

def exec
  socket_fd = @socket.to_i
  ENV["RESQUED_SOCKET"]      = socket_fd.to_s
  ENV["RESQUED_CONFIG_PATH"] = @config_paths.join(":")
  ENV["RESQUED_STATE"]       = @old_workers.map { |r| "#{r[:pid]}|#{r[:queue_key]}" }.join("||")
  ENV["RESQUED_LISTENER_ID"] = @listener_id.to_s
  ENV["RESQUED_MASTER_VERSION"] = Resqued::VERSION
  log "exec: #{Resqued::START_CTX['$0']} listener"
  exec_opts = { socket_fd => socket_fd } # Ruby 2.0 needs to be told to keep the file descriptor open during exec.
  if start_pwd = Resqued::START_CTX["pwd"]
    exec_opts[:chdir] = start_pwd
  end
  procline_buf = " " * 256 # make room for setproctitle
  Kernel.exec(Resqued::START_CTX["$0"], "listener", procline_buf, exec_opts)
end

#finish_worker(worker_pid, status) ⇒ Object

Private.



207
208
209
210
211
212
213
# File 'lib/resqued/listener.rb', line 207

def finish_worker(worker_pid, status)
  workers.each do |worker|
    if worker.pid == worker_pid
      worker.finished!(status)
    end
  end
end

#infoObject

Private.



263
264
265
# File 'lib/resqued/listener.rb', line 263

def info
  @info ||= RuntimeInfo.new
end

#init_workers(config) ⇒ Object

Private.



228
229
230
231
232
233
234
235
# File 'lib/resqued/listener.rb', line 228

def init_workers(config)
  @workers = config.build_workers
  @old_workers.each do |running_worker|
    if blocked_worker = @workers.detect { |worker| worker.idle? && worker.queue_key == running_worker[:queue_key] }
      blocked_worker.wait_for(running_worker[:pid].to_i)
    end
  end
end

#kill_all(signal) ⇒ Object

Private: send a signal to all the workers.



142
143
144
145
146
# File 'lib/resqued/listener.rb', line 142

def kill_all(signal)
  running = running_workers
  log "kill -#{signal} #{running.map { |r| r.pid }.inspect}"
  running.each { |worker| worker.kill(signal) }
end

#my_workersObject

Private: just the workers running as children of this listener.



157
158
159
# File 'lib/resqued/listener.rb', line 157

def my_workers
  workers.select { |worker| worker.running_here? }
end

#partition_workersObject

Private: Split the workers into [not-running, running]



162
163
164
# File 'lib/resqued/listener.rb', line 162

def partition_workers
  workers.partition { |worker| worker.idle? }
end

#reap_workers(waitpidflags = 0) ⇒ Object

Private: Check for workers that have stopped running



177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/resqued/listener.rb', line 177

def reap_workers(waitpidflags = 0)
  loop do
    worker_pid, status = Process.waitpid2(-1, waitpidflags)
    return :none_ready if worker_pid.nil?

    log "Worker exited #{status}"
    finish_worker(worker_pid, status)
    report_to_master("-#{worker_pid}")
  end
rescue Errno::ECHILD
  # All done
  :no_child
end

#report_to_master(status) ⇒ Object

Private: Report child process status.

Examples:

report_to_master("+12345,queue")  # Worker process PID:12345 started, working on a job from "queue".
report_to_master("-12345")        # Worker process PID:12345 exited.


243
244
245
246
247
248
249
# File 'lib/resqued/listener.rb', line 243

def report_to_master(status)
  @socket&.puts(status)
rescue Errno::EPIPE => e
  @socket = nil
  log "#{e.class.name} while writing to master"
  Process.kill(:QUIT, $$) # If the master is gone, LIFE IS NOW MEANINGLESS.
end

#runObject

Public: Run the main loop.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/resqued/listener.rb', line 71

def run
  trap(:HUP) {} # ignore this, in case it trickles in from the master.
  trap(:CHLD) { awake }
  SIGNALS.each { |signal| trap(signal) { SIGNAL_QUEUE << signal; awake } }
  @socket.close_on_exec = true
  write_procline("starting")

  config = Resqued::Config.new(@config_paths)
  set_default_resque_logger
  config.before_fork(info)
  report_to_master("RUNNING")

  write_procline("running")
  init_workers(config)
  exit_signal = run_workers_run

  write_procline("shutdown")
  burn_down_workers(exit_signal || :QUIT)
  @socket&.close
  @socket = nil
end

#run_workers_runObject

Private.



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/resqued/listener.rb', line 104

def run_workers_run
  loop do
    reap_workers(Process::WNOHANG)
    check_for_expired_workers
    start_idle_workers
    write_procline("running")
    case signal = SIGNAL_QUEUE.shift
    when nil
      yawn
    when :CONT
      kill_all(signal)
    when :QUIT, :INT, :TERM
      return signal
    end
  end
end

#running_workersObject

Private: just the running workers.



152
153
154
# File 'lib/resqued/listener.rb', line 152

def running_workers
  partition_workers.last
end

#set_default_resque_loggerObject

Private.



94
95
96
97
98
99
100
101
# File 'lib/resqued/listener.rb', line 94

def set_default_resque_logger
  require "resque"
  if Resque.respond_to?("logger=")
    Resque.logger = Resqued::Logging.build_logger
  end
rescue LoadError # rubocop: disable Lint/SuppressedException
  # Skip this step.
end

#start_idle_workersObject

Private.



216
217
218
219
220
221
222
223
224
225
# File 'lib/resqued/listener.rb', line 216

def start_idle_workers
  workers.each do |worker|
    next unless worker.idle?

    worker.try_start
    if pid = worker.pid
      report_to_master("+#{pid},#{worker.queue_key}")
    end
  end
end

#write_procline(status) ⇒ Object

Private.



252
253
254
255
256
257
258
259
260
# File 'lib/resqued/listener.rb', line 252

def write_procline(status)
  procline = "#{procline_version} listener"
  procline << " \##{@listener_id}" if @listener_id
  procline << " #{my_workers.size}/#{running_workers.size}/#{workers.size}" if workers
  procline << " [#{info.app_version}]" if info.app_version
  procline << " [#{status}]"
  procline << " #{@config_paths.join(' ')}"
  $0 = procline
end

#yawn(sleep_time = nil) ⇒ Object

Private.



167
168
169
170
171
172
173
174
# File 'lib/resqued/listener.rb', line 167

def yawn(sleep_time = nil)
  sleep_time ||=
    begin
      sleep_times = [60.0] + workers.map { |worker| worker.backing_off_for }
      [sleep_times.compact.min, 0.0].max
    end
  super(sleep_time, @socket)
end