Module: Pipemaster::SocketHelper

Includes:
Socket::Constants
Included in:
Server
Defined in:
lib/pipemaster/socket_helper.rb

Instance Method Summary collapse

Instance Method Details

#bind_listen(address = '0.0.0.0:8080', opt = {}) ⇒ Object

creates a new server, socket. address may be a HOST:PORT or an absolute path to a UNIX socket. address can even be a Socket object in which case it is immediately returned



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/pipemaster/socket_helper.rb', line 86

def bind_listen(address = '0.0.0.0:8080', opt = {})
  return address unless String === address

  sock = if address[0] == ?/
    if File.exist?(address)
      if File.socket?(address)
        if self.respond_to?(:logger)
          logger.info "unlinking existing socket=#{address}"
        end
        File.unlink(address)
      else
        raise ArgumentError,
              "socket=#{address} specified but it is not a socket!"
      end
    end
    old_umask = File.umask(opt[:umask] || 0)
    begin
      UNIXServer.new(address)
    ensure
      File.umask(old_umask)
    end
  elsif address =~ /^(\d+\.\d+\.\d+\.\d+):(\d+)$/
    TCPServer.new($1, $2.to_i)
  else
    raise ArgumentError, "Don't know how to bind: #{address}"
  end
  set_server_sockopt(sock, opt)
  sock
end

#log_buffer_sizes(sock, pfx = '') ⇒ Object



76
77
78
79
80
81
# File 'lib/pipemaster/socket_helper.rb', line 76

def log_buffer_sizes(sock, pfx = '')
  respond_to?(:logger) or return
  rcvbuf = sock.getsockopt(SOL_SOCKET, SO_RCVBUF).unpack('i')
  sndbuf = sock.getsockopt(SOL_SOCKET, SO_SNDBUF).unpack('i')
  logger.info "#{pfx}#{sock_name(sock)} rcvbuf=#{rcvbuf} sndbuf=#{sndbuf}"
end

#server_cast(sock) ⇒ Object

casts a given Socket to be a TCPServer or UNIXServer



138
139
140
141
142
143
144
145
# File 'lib/pipemaster/socket_helper.rb', line 138

def server_cast(sock)
  begin
    Socket.unpack_sockaddr_in(sock.getsockname)
    TCPServer.for_fd(sock.fileno)
  rescue ArgumentError
    UNIXServer.for_fd(sock.fileno)
  end
end

#set_server_sockopt(sock, opt) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/pipemaster/socket_helper.rb', line 57

def set_server_sockopt(sock, opt)
  opt ||= {}

  TCPSocket === sock and set_tcp_sockopt(sock, opt)

  if opt[:rcvbuf] || opt[:sndbuf]
    log_buffer_sizes(sock, "before: ")
    sock.setsockopt(SOL_SOCKET, SO_RCVBUF, opt[:rcvbuf]) if opt[:rcvbuf]
    sock.setsockopt(SOL_SOCKET, SO_SNDBUF, opt[:sndbuf]) if opt[:sndbuf]
    log_buffer_sizes(sock, " after: ")
  end
  sock.listen(opt[:backlog] || 1024)
  rescue => e
    if respond_to?(:logger)
      logger.error "error setting socket options: #{e.inspect}"
      logger.error e.backtrace.join("\n")
    end
end

#set_tcp_sockopt(sock, opt) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/pipemaster/socket_helper.rb', line 33

def set_tcp_sockopt(sock, opt)

  # highly portable, but off by default because we don't do keepalive
  if defined?(TCP_NODELAY) && ! (val = opt[:tcp_nodelay]).nil?
    sock.setsockopt(IPPROTO_TCP, TCP_NODELAY, val ? 1 : 0)
  end

  unless (val = opt[:tcp_nopush]).nil?
    val = val ? 1 : 0
    if defined?(TCP_CORK) # Linux
      sock.setsockopt(IPPROTO_TCP, TCP_CORK, val)
    elsif defined?(TCP_NOPUSH) # TCP_NOPUSH is untested (FreeBSD)
      sock.setsockopt(IPPROTO_TCP, TCP_NOPUSH, val)
    end
  end

  # No good reason to ever have deferred accepts off
  if defined?(TCP_DEFER_ACCEPT)
    sock.setsockopt(SOL_TCP, TCP_DEFER_ACCEPT, 1)
  elsif defined?(SO_ACCEPTFILTER) && defined?(FILTER_ARG)
    sock.setsockopt(SOL_SOCKET, SO_ACCEPTFILTER, FILTER_ARG)
  end
end

#sock_name(sock) ⇒ Object

Returns the configuration name of a socket as a string. sock may be a string value, in which case it is returned as-is Warning: TCP sockets may not always return the name given to it.



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/pipemaster/socket_helper.rb', line 119

def sock_name(sock)
  case sock
  when String then sock
  when UNIXServer
    Socket.unpack_sockaddr_un(sock.getsockname)
  when TCPServer
    Socket.unpack_sockaddr_in(sock.getsockname).reverse!.join(':')
  when Socket
    begin
      Socket.unpack_sockaddr_in(sock.getsockname).reverse!.join(':')
    rescue ArgumentError
      Socket.unpack_sockaddr_un(sock.getsockname)
    end
  else
    raise ArgumentError, "Unhandled class #{sock.class}: #{sock.inspect}"
  end
end