Class: Resqued::ListenerProxy

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/resqued/listener_proxy.rb

Overview

Controls a listener process from the master process.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

build_logger, close_log, #log, log_file, log_file=, #log_to_stdout?, logger, logging_io, #reopen_logs

Constructor Details

#initialize(options) ⇒ ListenerProxy

Public.



13
14
15
# File 'lib/resqued/listener_proxy.rb', line 13

def initialize(options)
  @options = options
end

Instance Attribute Details

#pidObject (readonly)

Public: The pid of the running listener process.



31
32
33
# File 'lib/resqued/listener_proxy.rb', line 31

def pid
  @pid
end

Instance Method Details

#disposeObject

Public: wrap up all the things, this object is going home.



18
19
20
21
22
23
# File 'lib/resqued/listener_proxy.rb', line 18

def dispose
  if @master_socket
    @master_socket.close
    @master_socket = nil
  end
end

#kill(signal) ⇒ Object

Public: Stop the listener process.



53
54
55
56
# File 'lib/resqued/listener_proxy.rb', line 53

def kill(signal)
  log "kill -#{signal} #{pid}"
  Process.kill(signal.to_s, pid)
end

#read_pipeObject

Public: An IO to select on to check if there is incoming data available.



26
27
28
# File 'lib/resqued/listener_proxy.rb', line 26

def read_pipe
  @master_socket
end

#read_worker_status(options) ⇒ Object

Public: Check for updates on running worker information.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/resqued/listener_proxy.rb', line 69

def read_worker_status(options)
  on_activity = options[:on_activity]
  until @master_socket.nil?
    IO.select([@master_socket], nil, nil, 0) or return
    case line = @master_socket.readline
    when /^\+(\d+),(.*)$/
      worker_pids[$1] = $2
      on_activity.worker_started($1) if on_activity
    when /^-(\d+)$/
      worker_pids.delete($1)
      on_activity.worker_finished($1) if on_activity
    when /^RUNNING/
      on_activity.listener_running(self) if on_activity
    when ''
      break
    else
      log "Malformed data from listener: #{line.inspect}"
    end
  end
rescue EOFError, Errno::ECONNRESET
  @master_socket.close
  @master_socket = nil
end

#runObject

Public: Start the listener process.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/resqued/listener_proxy.rb', line 34

def run
  return if pid
  listener_socket, master_socket = UNIXSocket.pair
  if @pid = fork
    # master
    listener_socket.close
    master_socket.close_on_exec = true
    log "Started listener #{@pid}"
    @master_socket = master_socket
  else
    # listener
    master_socket.close
    Master::TRAPS.each { |signal| trap(signal, 'DEFAULT') rescue nil }
    Listener.new(@options.merge(:socket => listener_socket)).exec
    exit
  end
end

#running_workersObject

Public: Get the list of workers running from this listener.



59
60
61
# File 'lib/resqued/listener_proxy.rb', line 59

def running_workers
  worker_pids.map { |pid, queue| { :pid => pid, :queue => queue } }
end

#worker_finished(pid) ⇒ Object

Public: Tell the listener process that a worker finished.



94
95
96
97
98
99
100
# File 'lib/resqued/listener_proxy.rb', line 94

def worker_finished(pid)
  return if @master_socket.nil?
  @master_socket.puts(pid)
rescue Errno::EPIPE
  @master_socket.close
  @master_socket = nil
end

#worker_pidsObject

Private: Map worker pids to queue names



64
65
66
# File 'lib/resqued/listener_proxy.rb', line 64

def worker_pids
  @worker_pids ||= {}
end