Class: EventMachineMini
- Inherits:
-
Object
- Object
- EventMachineMini
- Defined in:
- bin/httphere
Constant Summary collapse
- DEFAULT_SSL_OPTIONS =
Hash.new do |h,k| case k when :SSLCertificate h[k] = OpenSSL::X509::Certificate.new(File.read(h[:SSLCertificateFile])) when :SSLPrivateKey h[k] = OpenSSL::PKey::RSA.new(File.read(h[:SSLPrivateKeyFile])) else nil end end
Instance Attribute Summary collapse
-
#router ⇒ Object
readonly
Returns the value of attribute router.
Class Method Summary collapse
Instance Method Summary collapse
- #clients ⇒ Object
- #close_all_clients! ⇒ Object
- #connections_by_key ⇒ Object
-
#initialize(routes = {}) ⇒ EventMachineMini
constructor
A new instance of EventMachineMini.
- #run ⇒ Object
- #running? ⇒ Boolean
- #shutdown_listeners! ⇒ Object
- #stop! ⇒ Object
Constructor Details
#initialize(routes = {}) ⇒ EventMachineMini
Returns a new instance of EventMachineMini.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'bin/httphere', line 114 def initialize(routes={}) @router = {} # Set up the listening sockets (routes[:listen] || {}).each do |ip_port,instantiate_klass| ip, port = ip_port.split(/:/) socket = TCPServer.new(ip, port) socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1) @router[socket] = instantiate_klass # puts "Listening on #{ip_port} for #{instantiate_klass.name} messages..." end # Set up the listening SSL sockets (routes[:ssl_listen] || {}).each do |ip_port,instantiate_klass| ip, port = ip_port.split(/:/) socket = TCPServer.new(ip, port) socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1) ssl_socket = ::OpenSSL::SSL::SSLServer.new(socket, ssl_context) ssl_socket.start_immediately = self.class.ssl_config[:SSLStartImmediately] @router[ssl_socket] = instantiate_klass # puts "Listening on #{ip_port} (SSL) for #{instantiate_klass.name} messages..." end # Set up the connect sockets (routes[:connect] || {}).each do |ip_port,args| args = [args] unless args.is_a?(Array); instantiate_klass = args.shift ip, port = ip_port.split(/:/) socket = TCPSocket.new(ip, port) clients[socket] = instantiate_klass.new(self,socket,*args) # puts "Connecting to #{ip_port} for #{instantiate_klass.name} messages..." end end |
Instance Attribute Details
#router ⇒ Object (readonly)
Returns the value of attribute router.
113 114 115 |
# File 'bin/httphere', line 113 def router @router end |
Class Method Details
.ssl_config(config = DEFAULT_SSL_OPTIONS) ⇒ Object
101 102 103 |
# File 'bin/httphere', line 101 def ssl_config(config=DEFAULT_SSL_OPTIONS) @ssl_config ||= config end |
Instance Method Details
#clients ⇒ Object
230 231 232 |
# File 'bin/httphere', line 230 def clients @clients ||= {} end |
#close_all_clients! ⇒ Object
238 239 240 241 |
# File 'bin/httphere', line 238 def close_all_clients! # puts "Closing #{client_sockets.length} client connections..." client_sockets.each { |socket| socket.close rescue nil } end |
#connections_by_key ⇒ Object
234 235 236 |
# File 'bin/httphere', line 234 def connections_by_key @connections_by_key ||= {} end |
#run ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'bin/httphere', line 146 def run Thread.current[:status] = :running # trap("INT") { stop! } # This will end the event loop within 0.5 seconds when you hit Ctrl+C loop do # log "tick #{Thread.current[:status]}\n" if $DEBUG # Clean up any closed clients clients.each_key do |sock| if sock.closed? conn = clients.delete(sock) conn.upon_unbind if conn.respond_to?(:upon_unbind) if conn.respond_to?(:key) connections_by_key.delete(conn.key) rescue nil end end end if !running? unless listen_sockets.empty? # puts "Closing all listening ports." shutdown_listeners! end if client_sockets.empty? # It's the next time around after we closed all the client connections. break else # puts "Closing all client connections." close_all_clients! end end # puts "Listening to #{listen_sockets.length} sockets: #{listen_sockets.inspect}" begin event = select(listen_sockets + client_sockets,nil,nil,0.5) rescue IOError next end if event.nil? # nil would be a timeout, we'd do nothing and start loop over. Of course here we really have no timeout... @router.values.each { |klass| klass.tick if klass.respond_to?(:tick) } else event[0].each do |sock| # Iterate through all sockets that have pending activity # puts "Event on socket #{sock.inspect}" if listen_sockets.include?(sock) # Received a new connection to a listening socket sock = accept_client(sock) clients[sock].upon_new_connection if clients[sock].respond_to?(:upon_new_connection) else # Activity on a client-connected socket if sock.eof? # Socket's been closed by the client log "Connection #{clients[sock].to_s} was closed by the client.\n" sock.close clients[sock].upon_unbind if clients[sock].respond_to?(:upon_unbind) client = clients[sock] clients.delete(sock) else # Data in from the client catch :stop_reading do # puts "Reading data from socket #{sock.inspect} / #{clients[sock].inspect}" begin if sock.respond_to?(:read_nonblock) # puts "read_nonblock" 10.times { data = sock.read_nonblock(4096) clients[sock].receive_data(data) } else # puts "sysread" data = sock.sysread(4096) clients[sock].receive_data(data) end rescue Errno::EAGAIN, Errno::EWOULDBLOCK, EOFError => e # no-op. This will likely happen after every request, but that's expected. It ensures that we're done with the request's data. rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::ECONNREFUSED, IOError => e log "Closed Err: #{e.inspect}\n" sock.close clients[sock].upon_unbind if clients[sock].respond_to?(:upon_unbind) end end end end end end end end |
#running? ⇒ Boolean
106 107 108 |
# File 'bin/httphere', line 106 def running? Thread.current[:status] == :running end |
#shutdown_listeners! ⇒ Object
242 243 244 245 |
# File 'bin/httphere', line 242 def shutdown_listeners! # puts "Shutting down #{listen_sockets.length} listeners..." listen_sockets.each { |socket| socket.close rescue nil } end |
#stop! ⇒ Object
109 110 111 |
# File 'bin/httphere', line 109 def stop! Thread.current[:status] = :stop end |