Class: Resqued::ListenerProxy
- Inherits:
-
Object
- Object
- Resqued::ListenerProxy
- Includes:
- Logging
- Defined in:
- lib/resqued/listener_proxy.rb
Overview
Controls a listener process from the master process.
Instance Attribute Summary collapse
-
#pid ⇒ Object
readonly
Public: The pid of the running listener process.
Instance Method Summary collapse
-
#dispose ⇒ Object
Public: wrap up all the things, this object is going home.
-
#initialize(options) ⇒ ListenerProxy
constructor
Public.
-
#kill(signal) ⇒ Object
Public: Stop the listener process.
-
#read_pipe ⇒ Object
Public: An IO to select on to check if there is incoming data available.
-
#read_worker_status(options) ⇒ Object
Public: Check for updates on running worker information.
-
#run ⇒ Object
Public: Start the listener process.
-
#running_workers ⇒ Object
Public: Get the list of workers running from this listener.
-
#worker_finished(pid) ⇒ Object
Public: Tell the listener process that a worker finished.
-
#worker_pids ⇒ Object
Private: Map worker pids to queue names.
Methods included from Logging
build_logger, close_log, #log, log_file, log_file=, #log_to_stdout?, logger, logging_io, #reopen_logs
Constructor Details
#initialize(options) ⇒ ListenerProxy
Public.
13 14 15 |
# File 'lib/resqued/listener_proxy.rb', line 13 def initialize() @options = end |
Instance Attribute Details
#pid ⇒ Object (readonly)
Public: The pid of the running listener process.
31 32 33 |
# File 'lib/resqued/listener_proxy.rb', line 31 def pid @pid end |
Instance Method Details
#dispose ⇒ Object
Public: wrap up all the things, this object is going home.
18 19 20 21 22 23 |
# File 'lib/resqued/listener_proxy.rb', line 18 def dispose if @master_socket @master_socket.close @master_socket = nil end end |
#kill(signal) ⇒ Object
Public: Stop the listener process.
53 54 55 56 |
# File 'lib/resqued/listener_proxy.rb', line 53 def kill(signal) log "kill -#{signal} #{pid}" Process.kill(signal.to_s, pid) end |
#read_pipe ⇒ Object
Public: An IO to select on to check if there is incoming data available.
26 27 28 |
# File 'lib/resqued/listener_proxy.rb', line 26 def read_pipe @master_socket end |
#read_worker_status(options) ⇒ Object
Public: Check for updates on running worker information.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/resqued/listener_proxy.rb', line 69 def read_worker_status() on_activity = [:on_activity] until @master_socket.nil? IO.select([@master_socket], nil, nil, 0) or return case line = @master_socket.readline when /^\+(\d+),(.*)$/ worker_pids[$1] = $2 on_activity.worker_started($1) if on_activity when /^-(\d+)$/ worker_pids.delete($1) on_activity.worker_finished($1) if on_activity when /^RUNNING/ on_activity.listener_running(self) if on_activity when '' break else log "Malformed data from listener: #{line.inspect}" end end rescue EOFError, Errno::ECONNRESET @master_socket.close @master_socket = nil end |
#run ⇒ Object
Public: Start the listener process.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/resqued/listener_proxy.rb', line 34 def run return if pid listener_socket, master_socket = UNIXSocket.pair if @pid = fork # master listener_socket.close master_socket.close_on_exec = true log "Started listener #{@pid}" @master_socket = master_socket else # listener master_socket.close Master::TRAPS.each { |signal| trap(signal, 'DEFAULT') rescue nil } Listener.new(@options.merge(:socket => listener_socket)).exec exit end end |
#running_workers ⇒ Object
Public: Get the list of workers running from this listener.
59 60 61 |
# File 'lib/resqued/listener_proxy.rb', line 59 def running_workers worker_pids.map { |pid, queue| { :pid => pid, :queue => queue } } end |
#worker_finished(pid) ⇒ Object
Public: Tell the listener process that a worker finished.
94 95 96 97 98 99 100 |
# File 'lib/resqued/listener_proxy.rb', line 94 def worker_finished(pid) return if @master_socket.nil? @master_socket.puts(pid) rescue Errno::EPIPE @master_socket.close @master_socket = nil end |
#worker_pids ⇒ Object
Private: Map worker pids to queue names
64 65 66 |
# File 'lib/resqued/listener_proxy.rb', line 64 def worker_pids @worker_pids ||= {} end |