Class: Resqued::Listener

Inherits:
Object
  • Object
show all
Includes:
Logging, ProclineVersion, Sleepy
Defined in:
lib/resqued/listener.rb

Overview

A listener process. Watches resque queues and forks workers.

Constant Summary collapse

SIGNALS =
[ :CONT, :QUIT, :INT, :TERM ]
ALL_SIGNALS =
SIGNALS + [ :CHLD ]
SIGNAL_QUEUE =
[]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Sleepy

#awake, #self_pipe

Methods included from ProclineVersion

#procline_version

Methods included from Logging

build_logger, close_log, #log, log_file, log_file=, #log_to_stdout?, logger, logging_io, #reopen_logs

Constructor Details

#initialize(options) ⇒ Listener

Configure a new listener object.

Runs in the master process.



21
22
23
24
25
26
# File 'lib/resqued/listener.rb', line 21

def initialize(options)
  @config_paths    = options.fetch(:config_paths)
  @old_workers     = options.fetch(:old_workers) { [] }.freeze
  @socket          = options.fetch(:socket)
  @listener_id     = options.fetch(:listener_id) { nil }
end

Instance Attribute Details

#workersObject (readonly)

Private: all available workers



143
144
145
# File 'lib/resqued/listener.rb', line 143

def workers
  @workers
end

Class Method Details

.exec!Object

Public: Given args from #exec, start this listener.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/resqued/listener.rb', line 48

def self.exec!
  options = {}
  if socket = ENV['RESQUED_SOCKET']
    options[:socket] = Socket.for_fd(socket.to_i)
  end
  if path = ENV['RESQUED_CONFIG_PATH']
    options[:config_paths] = path.split(':')
  end
  if state = ENV['RESQUED_STATE']
    options[:old_workers] = state.split('||').map { |s| Hash[[:pid,:queue].zip(s.split('|'))] }
  end
  if listener_id = ENV['RESQUED_LISTENER_ID']
    options[:listener_id] = listener_id
  end
  new(options).run
end

Instance Method Details

#burn_down_workers(signal) ⇒ Object

Private: make sure all the workers stop.

Resque workers have gaps in their signal-handling ability.



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/resqued/listener.rb', line 119

def burn_down_workers(signal)
  loop do
    check_for_expired_workers
    write_procline('shutdown')
    SIGNAL_QUEUE.clear

    break if :no_child == reap_workers(Process::WNOHANG)
    kill_all(signal)

    sleep 1 # Don't kill any more often than every 1s.
    yawn 5
  end
  # One last time.
  reap_workers
end

#check_for_expired_workersObject

Private: Check if master reports any dead workers.



188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/resqued/listener.rb', line 188

def check_for_expired_workers
  return unless @socket
  loop do
    IO.select([@socket], nil, nil, 0) or return
    line = @socket.readline
    finish_worker(line.to_i, nil)
  end
rescue EOFError, Errno::ECONNRESET => e
  @socket = nil
  log "#{e.class.name} while reading from master"
  Process.kill(:QUIT, $$)
end

#execObject

Public: As an alternative to #run, exec a new ruby instance for this listener.

Runs in the master process.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/resqued/listener.rb', line 31

def exec
  socket_fd = @socket.to_i
  ENV['RESQUED_SOCKET']      = socket_fd.to_s
  ENV['RESQUED_CONFIG_PATH'] = @config_paths.join(':')
  ENV['RESQUED_STATE']       = (@old_workers.map { |r| "#{r[:pid]}|#{r[:queue]}" }.join('||'))
  ENV['RESQUED_LISTENER_ID'] = @listener_id.to_s
  ENV['RESQUED_MASTER_VERSION'] = Resqued::VERSION
  log "exec: #{Resqued::START_CTX['$0']} listener"
  exec_opts = {socket_fd => socket_fd} # Ruby 2.0 needs to be told to keep the file descriptor open during exec.
  if start_pwd = Resqued::START_CTX['pwd']
    exec_opts[:chdir] = start_pwd
  end
  procline_buf = ' ' * 256 # make room for setproctitle
  Kernel.exec(Resqued::START_CTX['$0'], 'listener', procline_buf, exec_opts)
end

#finish_worker(worker_pid, status) ⇒ Object

Private.



202
203
204
205
206
207
208
# File 'lib/resqued/listener.rb', line 202

def finish_worker(worker_pid, status)
  workers.each do |worker|
    if worker.pid == worker_pid
      worker.finished!(status)
    end
  end
end

#infoObject

Private.



258
259
260
# File 'lib/resqued/listener.rb', line 258

def info
  @info ||= RuntimeInfo.new
end

#init_workers(config) ⇒ Object

Private.



223
224
225
226
227
228
229
230
# File 'lib/resqued/listener.rb', line 223

def init_workers(config)
  @workers = config.build_workers
  @old_workers.each do |running_worker|
    if blocked_worker = @workers.detect { |worker| worker.idle? && worker.queue_key == running_worker[:queue] }
      blocked_worker.wait_for(running_worker[:pid].to_i)
    end
  end
end

#kill_all(signal) ⇒ Object

Private: send a signal to all the workers.



136
137
138
139
140
# File 'lib/resqued/listener.rb', line 136

def kill_all(signal)
  idle, running = partition_workers
  log "kill -#{signal} #{running.map { |r| r.pid }.inspect}"
  running.each { |worker| worker.kill(signal) }
end

#my_workersObject

Private: just the workers running as children of this listener.



151
152
153
# File 'lib/resqued/listener.rb', line 151

def my_workers
  workers.select { |worker| worker.running_here? }
end

#partition_workersObject

Private: Split the workers into [not-running, running]



156
157
158
# File 'lib/resqued/listener.rb', line 156

def partition_workers
  workers.partition { |worker| worker.idle? }
end

#reap_workers(waitpidflags = 0) ⇒ Object

Private: Check for workers that have stopped running



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/resqued/listener.rb', line 171

def reap_workers(waitpidflags = 0)
  loop do
    worker_pid, status = Process.waitpid2(-1, waitpidflags)
    if worker_pid.nil?
      return :none_ready
    else
      log "Worker exited #{status}"
      finish_worker(worker_pid, status)
      report_to_master("-#{worker_pid}")
    end
  end
rescue Errno::ECHILD
  # All done
  :no_child
end

#report_to_master(status) ⇒ Object

Private: Report child process status.

Examples:

report_to_master("+12345,queue")  # Worker process PID:12345 started, working on a job from "queue".
report_to_master("-12345")        # Worker process PID:12345 exited.


238
239
240
241
242
243
244
# File 'lib/resqued/listener.rb', line 238

def report_to_master(status)
  @socket.puts(status) if @socket
rescue Errno::EPIPE => e
  @socket = nil
  log "#{e.class.name} while writing to master"
  Process.kill(:QUIT, $$) # If the master is gone, LIFE IS NOW MEANINGLESS.
end

#runObject

Public: Run the main loop.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/resqued/listener.rb', line 71

def run
  trap(:CHLD) { awake }
  SIGNALS.each { |signal| trap(signal) { SIGNAL_QUEUE << signal ; awake } }
  @socket.close_on_exec = true
  write_procline('starting')

  config = Resqued::Config.new(@config_paths)
  set_default_resque_logger
  config.before_fork(info)
  report_to_master("RUNNING")

  write_procline('running')
  init_workers(config)
  exit_signal = run_workers_run

  write_procline('shutdown')
  burn_down_workers(exit_signal || :QUIT)
end

#run_workers_runObject

Private.



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/resqued/listener.rb', line 99

def run_workers_run
  loop do
    reap_workers(Process::WNOHANG)
    check_for_expired_workers
    start_idle_workers
    write_procline('running')
    case signal = SIGNAL_QUEUE.shift
    when nil
      yawn
    when :CONT
      kill_all(signal)
    when :QUIT, :INT, :TERM
      return signal
    end
  end
end

#running_workersObject

Private: just the running workers.



146
147
148
# File 'lib/resqued/listener.rb', line 146

def running_workers
  partition_workers.last
end

#set_default_resque_loggerObject

Private.



91
92
93
94
95
96
# File 'lib/resqued/listener.rb', line 91

def set_default_resque_logger
  require 'resque'
  if Resque.respond_to?('logger=')
    Resque.logger = Resqued::Logging.build_logger
  end
end

#start_idle_workersObject

Private.



211
212
213
214
215
216
217
218
219
220
# File 'lib/resqued/listener.rb', line 211

def start_idle_workers
  workers.each do |worker|
    if worker.idle?
      worker.try_start
      if pid = worker.pid
        report_to_master("+#{pid},#{worker.queue_key}")
      end
    end
  end
end

#write_procline(status) ⇒ Object

Private.



247
248
249
250
251
252
253
254
255
# File 'lib/resqued/listener.rb', line 247

def write_procline(status)
  procline = "#{procline_version} listener"
  procline << " \##{@listener_id}" if @listener_id
  procline << " #{my_workers.size}/#{running_workers.size}/#{workers.size}" if workers
  procline << " [#{info.app_version}]" if info.app_version
  procline << " [#{status}]"
  procline << " #{@config_paths.join(' ')}"
  $0 = procline
end

#yawn(sleep_time = nil) ⇒ Object

Private.



161
162
163
164
165
166
167
168
# File 'lib/resqued/listener.rb', line 161

def yawn(sleep_time = nil)
  sleep_time ||=
    begin
      sleep_times = [60.0] + workers.map { |worker| worker.backing_off_for }
      [sleep_times.compact.min, 0.0].max
    end
  super(sleep_time, @socket)
end