Class: Resqued::ListenerProxy
- Inherits:
-
Object
- Object
- Resqued::ListenerProxy
- Includes:
- Logging
- Defined in:
- lib/resqued/listener_proxy.rb
Overview
Controls a listener process from the master process.
Instance Attribute Summary collapse
-
#state ⇒ Object
readonly
Returns the value of attribute state.
Instance Method Summary collapse
-
#dispose ⇒ Object
Public: wrap up all the things, this object is going home.
-
#initialize(state) ⇒ ListenerProxy
constructor
Public.
-
#kill(signal) ⇒ Object
Public: Stop the listener process.
-
#pid ⇒ Object
Public: The pid of the running listener process.
-
#read_pipe ⇒ Object
Public: An IO to select on to check if there is incoming data available.
-
#read_worker_status(options) ⇒ Object
Public: Check for updates on running worker information.
-
#run ⇒ Object
Public: Start the listener process.
-
#running_workers ⇒ Object
Public: Get the list of workers running from this listener.
-
#worker_finished(pid) ⇒ Object
Public: Tell the listener process that a worker finished.
-
#worker_pids ⇒ Object
Private: Map worker pids to queue names.
Methods included from Logging
build_logger, close_log, #log, log_file, log_file=, #log_to_stdout?, logger, logging_io, #reopen_logs
Constructor Details
#initialize(state) ⇒ ListenerProxy
Public.
13 14 15 |
# File 'lib/resqued/listener_proxy.rb', line 13 def initialize(state) @state = state end |
Instance Attribute Details
#state ⇒ Object (readonly)
Returns the value of attribute state.
17 18 19 |
# File 'lib/resqued/listener_proxy.rb', line 17 def state @state end |
Instance Method Details
#dispose ⇒ Object
Public: wrap up all the things, this object is going home.
20 21 22 23 24 25 |
# File 'lib/resqued/listener_proxy.rb', line 20 def dispose if @state.master_socket @state.master_socket.close @state.master_socket = nil end end |
#kill(signal) ⇒ Object
Public: Stop the listener process.
58 59 60 61 |
# File 'lib/resqued/listener_proxy.rb', line 58 def kill(signal) log "kill -#{signal} #{pid}" Process.kill(signal.to_s, pid) end |
#pid ⇒ Object
Public: The pid of the running listener process.
33 34 35 |
# File 'lib/resqued/listener_proxy.rb', line 33 def pid @state.pid end |
#read_pipe ⇒ Object
Public: An IO to select on to check if there is incoming data available.
28 29 30 |
# File 'lib/resqued/listener_proxy.rb', line 28 def read_pipe @state.master_socket end |
#read_worker_status(options) ⇒ Object
Public: Check for updates on running worker information.
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/resqued/listener_proxy.rb', line 74 def read_worker_status() on_activity = [:on_activity] until @state.master_socket.nil? IO.select([@state.master_socket], nil, nil, 0) or return case line = @state.master_socket.readline when /^\+(\d+),(.*)$/ worker_pids[$1] = $2 on_activity&.worker_started($1) when /^-(\d+)$/ worker_pids.delete($1) on_activity&.worker_finished($1) when /^RUNNING/ on_activity&.listener_running(self) when "" break else log "Malformed data from listener: #{line.inspect}" end end rescue EOFError, Errno::ECONNRESET @state.master_socket.close @state.master_socket = nil end |
#run ⇒ Object
Public: Start the listener process.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/resqued/listener_proxy.rb', line 38 def run return if pid listener_socket, master_socket = UNIXSocket.pair if @state.pid = fork # master listener_socket.close master_socket.close_on_exec = true log "Started listener #{@state.pid}" @state.master_socket = master_socket else # listener master_socket.close Master::TRAPS.each { |signal| trap(signal, "DEFAULT") rescue nil } Listener.new(@state..merge(socket: listener_socket)).exec exit end end |
#running_workers ⇒ Object
Public: Get the list of workers running from this listener.
64 65 66 |
# File 'lib/resqued/listener_proxy.rb', line 64 def running_workers worker_pids.map { |pid, queue_key| { pid: pid, queue_key: queue_key } } end |
#worker_finished(pid) ⇒ Object
Public: Tell the listener process that a worker finished.
99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/resqued/listener_proxy.rb', line 99 def worker_finished(pid) return if @state.master_socket.nil? @state.master_socket.write_nonblock("#{pid}\n") rescue IO::WaitWritable log "Couldn't tell #{@state.pid} that #{pid} exited!" # Ignore it, maybe the next time it'll work. rescue Errno::EPIPE @state.master_socket.close @state.master_socket = nil end |
#worker_pids ⇒ Object
Private: Map worker pids to queue names
69 70 71 |
# File 'lib/resqued/listener_proxy.rb', line 69 def worker_pids @state.worker_pids ||= {} end |