Class: Resqued::Master

Inherits:
Object
  • Object
show all
Includes:
Logging, Pidfile, ProclineVersion, Sleepy
Defined in:
lib/resqued/master.rb

Overview

The master process.

  • Spawns a listener.

  • Tracks all work. (IO pipe from listener.)

  • Handles signals.

Constant Summary collapse

SIGNALS =
[ :HUP, :INT, :USR2, :CONT, :TERM, :QUIT ]
OPTIONAL_SIGNALS =
[ :INFO ]
OTHER_SIGNALS =
[:CHLD, 'EXIT']
TRAPS =
SIGNALS + OPTIONAL_SIGNALS + OTHER_SIGNALS
SIGNAL_QUEUE =
[]

Instance Method Summary collapse

Methods included from Sleepy

#awake, #self_pipe

Methods included from ProclineVersion

#procline_version

Methods included from Pidfile

#remove_pidfile, #with_pidfile, #write_pidfile

Methods included from Logging

build_logger, close_log, #log, log_file, log_file=, #log_to_stdout?, logger, logging_io, #reopen_logs

Constructor Details

#initialize(options) ⇒ Master

Returns a new instance of Master.



19
20
21
22
23
24
25
26
# File 'lib/resqued/master.rb', line 19

def initialize(options)
  @config_paths = options.fetch(:config_paths)
  @pidfile      = options.fetch(:master_pidfile) { nil }
  @status_pipe  = options.fetch(:status_pipe) { nil }
  @fast_exit    = options.fetch(:fast_exit) { false }
  @listener_backoff = Backoff.new
  @listeners_created = 0
end

Instance Method Details

#all_listenersObject

Private: All the ListenerProxy objects.



109
110
111
# File 'lib/resqued/master.rb', line 109

def all_listeners
  listener_pids.values
end

#dump_object_countsObject

Private.



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/resqued/master.rb', line 77

def dump_object_counts
  log GC.stat.inspect
  counts = {}
  total = 0
  ObjectSpace.each_object do |o|
    count = counts[o.class.name] || 0
    counts[o.class.name] = count + 1
    total += 1
  end
  top = 10
  log "#{total} objects. top #{top}:"
  counts.sort_by { |name, count| count }.reverse.each_with_index do |(name, count), i|
    if i < top
      diff = ""
      if last = @last_counts && @last_counts[name]
        diff = " (#{'%+d' % (count - last)})"
      end
      log "   #{count} #{name}#{diff}"
    end
  end
  @last_counts = counts
  log GC.stat.inspect
rescue => e
  log "Error while counting objects: #{e}"
end

#go_hamObject

Private: dat main loop.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/resqued/master.rb', line 44

def go_ham
  loop do
    read_listeners
    reap_all_listeners(Process::WNOHANG)
    start_listener unless @paused
    case signal = SIGNAL_QUEUE.shift
    when nil
      yawn(@listener_backoff.how_long? || 30.0)
    when :INFO
      dump_object_counts
    when :HUP
      reopen_logs
      log "Restarting listener with new configuration and application."
      prepare_new_listener
    when :USR2
      log "Pause job processing"
      @paused = true
      kill_listener(:QUIT, @current_listener)
      @current_listener = nil
    when :CONT
      log "Resume job processing"
      @paused = false
      kill_all_listeners(:CONT)
    when :INT, :TERM, :QUIT
      log "Shutting down..."
      kill_all_listeners(signal)
      wait_for_workers unless @fast_exit
      break
    end
  end
end

#install_signal_handlersObject



225
226
227
228
229
# File 'lib/resqued/master.rb', line 225

def install_signal_handlers
  trap(:CHLD) { awake }
  SIGNALS.each { |signal| trap(signal) { SIGNAL_QUEUE << signal ; awake } }
  OPTIONAL_SIGNALS.each { |signal| trap(signal) { SIGNAL_QUEUE << signal ; awake } rescue nil }
end

#kill_all_listeners(signal) ⇒ Object



183
184
185
186
187
# File 'lib/resqued/master.rb', line 183

def kill_all_listeners(signal)
  all_listeners.each do |l|
    l.kill(signal)
  end
end

#kill_listener(signal, listener) ⇒ Object



179
180
181
# File 'lib/resqued/master.rb', line 179

def kill_listener(signal, listener)
  listener.kill(signal) if listener
end

#listener_pidsObject

Private: Map listener pids to ListenerProxy objects.



104
105
106
# File 'lib/resqued/master.rb', line 104

def listener_pids
  @listener_pids ||= {}
end

#listener_running(listener) ⇒ Object

Listener message: A listener finished booting, and is ready to start workers.

Promotes a booting listener to be the current listener.



151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/resqued/master.rb', line 151

def listener_running(listener)
  listener_status(listener, 'ready')
  if listener == @current_listener
    kill_listener(:QUIT, @last_good_listener)
    @last_good_listener = nil
  else
    # This listener didn't receive the last SIGQUIT we sent.
    # (It was probably sent before the listener had set up its traps.)
    # So kill it again. We have moved on.
    kill_listener(:QUIT, listener)
  end
end

#listener_status(listener, status) ⇒ Object



254
255
256
257
258
# File 'lib/resqued/master.rb', line 254

def listener_status(listener, status)
  if listener && listener.pid
    status_message('listener', listener.pid, status)
  end
end

#next_listener_idObject



123
124
125
# File 'lib/resqued/master.rb', line 123

def next_listener_id
  @listeners_created += 1
end

#no_more_unexpected_exitsObject



242
243
244
# File 'lib/resqued/master.rb', line 242

def no_more_unexpected_exits
  trap('EXIT', 'DEFAULT')
end

#prepare_new_listenerObject

Private: Spin up a new listener.

The old one will be killed when the new one is ready for workers.



167
168
169
170
171
172
173
174
175
176
177
# File 'lib/resqued/master.rb', line 167

def prepare_new_listener
  if @last_good_listener
    # The last_good_listener is still running because we got another HUP before the new listener finished booting.
    # Keep the last_good_listener (where all the workers are) and kill the booting current_listener. We'll start a new one.
    kill_listener(:QUIT, @current_listener)
  else
    @last_good_listener = @current_listener
  end
  # Indicate to `start_listener` that it should start a new listener.
  @current_listener = nil
end

#read_listenersObject



127
128
129
130
131
# File 'lib/resqued/master.rb', line 127

def read_listeners
  all_listeners.each do |l|
    l.read_worker_status(:on_activity => self)
  end
end

#reap_all_listeners(waitpid_flags = 0) ⇒ Object



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/resqued/master.rb', line 193

def reap_all_listeners(waitpid_flags = 0)
  begin
    lpid, status = Process.waitpid2(-1, waitpid_flags)
    if lpid
      log "Listener exited #{status}"
      if @current_listener && @current_listener.pid == lpid
        @listener_backoff.died
        @current_listener = nil
      end

      if @last_good_listener && @last_good_listener.pid == lpid
        @last_good_listener = nil
      end
      dead_listener = listener_pids.delete(lpid)
      listener_status dead_listener, 'stop'
      dead_listener.dispose
      write_procline
    else
      return
    end
  rescue Errno::ECHILD
    return
  end while true
end

#report_unexpected_exitsObject



231
232
233
234
235
236
237
238
239
240
# File 'lib/resqued/master.rb', line 231

def report_unexpected_exits
  trap('EXIT') do
    log("EXIT #{$!.inspect}")
    if $!
      $!.backtrace.each do |line|
        log(line)
      end
    end
  end
end

#run(ready_pipe = nil) ⇒ Object

Public: Starts the master process.



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/resqued/master.rb', line 29

def run(ready_pipe = nil)
  report_unexpected_exits
  with_pidfile(@pidfile) do
    write_procline
    install_signal_handlers
    if ready_pipe
      ready_pipe.syswrite($$.to_s)
      ready_pipe.close rescue nil
    end
    go_ham
  end
  no_more_unexpected_exits
end

#start_listenerObject



113
114
115
116
117
118
119
120
121
# File 'lib/resqued/master.rb', line 113

def start_listener
  return if @current_listener || @listener_backoff.wait?
  @current_listener = ListenerProxy.new(:config_paths => @config_paths, :old_workers => all_listeners.map { |l| l.running_workers }.flatten, :listener_id => next_listener_id)
  @current_listener.run
  listener_status @current_listener, 'start'
  @listener_backoff.started
  listener_pids[@current_listener.pid] = @current_listener
  write_procline
end

#status_message(type, pid, status) ⇒ Object



264
265
266
267
268
# File 'lib/resqued/master.rb', line 264

def status_message(type, pid, status)
  if @status_pipe
    @status_pipe.write("#{type},#{pid},#{status}\n")
  end
end

#wait_for_workersObject



189
190
191
# File 'lib/resqued/master.rb', line 189

def wait_for_workers
  reap_all_listeners
end

#worker_finished(pid) ⇒ Object

Listener message: A worker just stopped working.

Forwards the message to the other listeners.



141
142
143
144
145
146
# File 'lib/resqued/master.rb', line 141

def worker_finished(pid)
  worker_status(pid, 'stop')
  all_listeners.each do |other|
    other.worker_finished(pid)
  end
end

#worker_started(pid) ⇒ Object

Listener message: A worker just started working.



134
135
136
# File 'lib/resqued/master.rb', line 134

def worker_started(pid)
  worker_status(pid, 'start')
end

#worker_status(pid, status) ⇒ Object



260
261
262
# File 'lib/resqued/master.rb', line 260

def worker_status(pid, status)
  status_message('worker', pid, status)
end

#write_proclineObject



250
251
252
# File 'lib/resqued/master.rb', line 250

def write_procline
  $0 = "#{procline_version} master [gen #{@listeners_created}] [#{listener_pids.size} running] #{ARGV.join(' ')}"
end

#yawn(duration) ⇒ Object



246
247
248
# File 'lib/resqued/master.rb', line 246

def yawn(duration)
  super(duration, all_listeners.map { |l| l.read_pipe })
end