Class: DTAS::UNIXServer
- Inherits:
-
Object
- Object
- DTAS::UNIXServer
- Defined in:
- lib/dtas/unix_server.rb
Overview
The programming model for the event loop here aims to be compatible with EPOLLONESHOT use with epoll, since that fits my brain far better than existing evented APIs/frameworks. If we cared about scalability to thousands of clients, we’d really use epoll, but IO.select can be just as fast (or faster) with few descriptors and is obviously more portable.
Instance Attribute Summary collapse
-
#to_io ⇒ Object
readonly
:nodoc:.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(path) ⇒ UNIXServer
constructor
A new instance of UNIXServer.
- #readable_iter ⇒ Object
- #run_once ⇒ Object
- #wait_ctl(io, err) ⇒ Object
- #write_failed(client, e) ⇒ Object
Constructor Details
#initialize(path) ⇒ UNIXServer
Returns a new instance of UNIXServer.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/dtas/unix_server.rb', line 24 def initialize(path) @path = path # lock down access by default, arbitrary commands may run as the # same user dtas-player runs as: old_umask = File.umask(0077) @to_io = Socket.new(:UNIX, :SEQPACKET, 0) addr = Socket.pack_sockaddr_un(path) begin @to_io.bind(addr) rescue Errno::EADDRINUSE # maybe we have an old path leftover from a killed process tmp = Socket.new(:UNIX, :SEQPACKET, 0) begin tmp.connect(addr) raise RuntimeError, "socket `#{path}' is in use", [] rescue Errno::ECONNREFUSED # ok, leftover socket, unlink and rebind anyways File.unlink(path) @to_io.bind(addr) ensure tmp.close end end @to_io.listen(1024) @readers = { self => true } @writers = {} ensure File.umask(old_umask) end |
Instance Attribute Details
#to_io ⇒ Object (readonly)
:nodoc:
17 18 19 |
# File 'lib/dtas/unix_server.rb', line 17 def to_io @to_io end |
Instance Method Details
#close ⇒ Object
19 20 21 22 |
# File 'lib/dtas/unix_server.rb', line 19 def close File.unlink(@path) @to_io.close end |
#readable_iter ⇒ Object
59 60 61 62 63 64 65 66 67 68 |
# File 'lib/dtas/unix_server.rb', line 59 def readable_iter # we do not do anything with the block passed to us begin sock, _ = @to_io.accept_nonblock @readers[DTAS::UNIXAccepted.new(sock)] = true rescue Errno::ECONNABORTED # ignore this, it happens rescue Errno::EAGAIN return :wait_readable end while true end |
#run_once ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/dtas/unix_server.rb', line 100 def run_once # give IO.select one-shot behavior, snapshot and replace the watchlist begin r = IO.select(@readers.keys, @writers.keys) or return rescue IOError # this only happens when sinks error out @writers.delete_if { |io| io.to_io.closed? } retry end @hot_read = r[0] r[1].each do |io| @writers.delete(io) wait_ctl(io, io.writable_iter) end @hot_read = nil r[0].each do |io| @readers.delete(io) wait_ctl(io, io.readable_iter { |_io, msg| yield(_io, msg) }) end end |
#wait_ctl(io, err) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/dtas/unix_server.rb', line 70 def wait_ctl(io, err) case err when :hot_read # this is only safe when we're iterating through ready writers # the linear search for Array#include? is not expensive since # we usually don't have a lot of sinks. @hot_read << io unless @hot_read.include?(io) when :wait_readable @readers[io] = true when :wait_writable @writers[io] = true when :delete @readers.delete(io) @writers.delete(io) when :ignore # There are 2 cases for :ignore # - DTAS::Buffer was readable before, but all destinations (e.g. sinks) # were blocked, so we stop caring for producer (buffer) readability. # - a consumer (e.g. DTAS::Sink) just became writable, but the # corresponding DTAS::Buffer was already readable in a previous # call. when nil io.close when StandardError io.close else raise "BUG: wait_ctl invalid: #{io} #{err.inspect}" end end |
#write_failed(client, e) ⇒ Object
54 55 56 57 |
# File 'lib/dtas/unix_server.rb', line 54 def write_failed(client, e) warn "failed to write to #{client}: #{e.} (#{e.class})" client.close end |