Class: EventMachineMini

Inherits:
Object
  • Object
show all
Defined in:
bin/httphere

Constant Summary collapse

DEFAULT_SSL_OPTIONS =
Hash.new do |h,k|
  case k
  when :SSLCertificate
    h[k] = OpenSSL::X509::Certificate.new(File.read(h[:SSLCertificateFile]))
  when :SSLPrivateKey
    h[k] = OpenSSL::PKey::RSA.new(File.read(h[:SSLPrivateKeyFile]))
  else
    nil
  end
end

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(routes = {}) ⇒ EventMachineMini

Returns a new instance of EventMachineMini.



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'bin/httphere', line 114

def initialize(routes={})
  @router = {}
  # Set up the listening sockets
  (routes[:listen] || {}).each do |ip_port,instantiate_klass|
    ip, port = ip_port.split(/:/)
    socket = TCPServer.new(ip, port)
    socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1)
    @router[socket] = instantiate_klass
    # puts "Listening on #{ip_port} for #{instantiate_klass.name} messages..."
  end
  # Set up the listening SSL sockets
  (routes[:ssl_listen] || {}).each do |ip_port,instantiate_klass|
    ip, port = ip_port.split(/:/)
    socket = TCPServer.new(ip, port)
    socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1)

    ssl_socket = ::OpenSSL::SSL::SSLServer.new(socket, ssl_context)
    ssl_socket.start_immediately = self.class.ssl_config[:SSLStartImmediately]

    @router[ssl_socket] = instantiate_klass
    # puts "Listening on #{ip_port} (SSL) for #{instantiate_klass.name} messages..."
  end
  # Set up the connect sockets
  (routes[:connect] || {}).each do |ip_port,args|
    args = [args] unless args.is_a?(Array); instantiate_klass = args.shift
    ip, port = ip_port.split(/:/)
    socket = TCPSocket.new(ip, port)
    clients[socket] = instantiate_klass.new(self,socket,*args)
    # puts "Connecting to #{ip_port} for #{instantiate_klass.name} messages..."
  end
end

Instance Attribute Details

#routerObject (readonly)

Returns the value of attribute router.



113
114
115
# File 'bin/httphere', line 113

def router
  @router
end

Class Method Details

.ssl_config(config = DEFAULT_SSL_OPTIONS) ⇒ Object



101
102
103
# File 'bin/httphere', line 101

def ssl_config(config=DEFAULT_SSL_OPTIONS)
  @ssl_config ||= config
end

Instance Method Details

#clientsObject



230
231
232
# File 'bin/httphere', line 230

def clients
  @clients ||= {}
end

#close_all_clients!Object



238
239
240
241
# File 'bin/httphere', line 238

def close_all_clients!
  # puts "Closing #{client_sockets.length} client connections..."
  client_sockets.each { |socket| socket.close rescue nil }
end

#connections_by_keyObject



234
235
236
# File 'bin/httphere', line 234

def connections_by_key
  @connections_by_key ||= {}
end

#runObject



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'bin/httphere', line 146

def run
  Thread.current[:status] = :running
  # trap("INT") { stop! } # This will end the event loop within 0.5 seconds when you hit Ctrl+C
  loop do
    # log "tick #{Thread.current[:status]}\n" if $DEBUG

    # Clean up any closed clients
    clients.each_key do |sock|
      if sock.closed?
        conn = clients.delete(sock)
        conn.upon_unbind if conn.respond_to?(:upon_unbind)
        if conn.respond_to?(:key)
          connections_by_key.delete(conn.key) rescue nil
        end
      end
    end
    
    if !running?
      unless listen_sockets.empty?
        # puts "Closing all listening ports."
        shutdown_listeners!
      end
      if client_sockets.empty?
        # It's the next time around after we closed all the client connections.
        break
      else
        # puts "Closing all client connections."
        close_all_clients!
      end
    end

    # puts "Listening to #{listen_sockets.length} sockets: #{listen_sockets.inspect}"
    begin
      event = select(listen_sockets + client_sockets,nil,nil,0.5)
    rescue IOError
      next
    end
    if event.nil? # nil would be a timeout, we'd do nothing and start loop over. Of course here we really have no timeout...
      @router.values.each { |klass|
        klass.tick if klass.respond_to?(:tick)
      }
    else
      event[0].each do |sock| # Iterate through all sockets that have pending activity
        # puts "Event on socket #{sock.inspect}"
        if listen_sockets.include?(sock) # Received a new connection to a listening socket
          sock = accept_client(sock)
          clients[sock].upon_new_connection if clients[sock].respond_to?(:upon_new_connection)
        else # Activity on a client-connected socket
          if sock.eof? # Socket's been closed by the client
            log "Connection #{clients[sock].to_s} was closed by the client.\n"
            sock.close
            clients[sock].upon_unbind if clients[sock].respond_to?(:upon_unbind)
            client = clients[sock]
            clients.delete(sock)
          else # Data in from the client
            catch :stop_reading do
              # puts "Reading data from socket #{sock.inspect} / #{clients[sock].inspect}"
              begin
                if sock.respond_to?(:read_nonblock)
                  # puts "read_nonblock"
                  10.times {
                    data = sock.read_nonblock(4096)
                    clients[sock].receive_data(data)
                  }
                else
                  # puts "sysread"
                  data = sock.sysread(4096)
                  clients[sock].receive_data(data)
                end
              rescue Errno::EAGAIN, Errno::EWOULDBLOCK, EOFError => e
                # no-op. This will likely happen after every request, but that's expected. It ensures that we're done with the request's data.
              rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::ECONNREFUSED, IOError => e
                log "Closed Err: #{e.inspect}\n"
                sock.close
                clients[sock].upon_unbind if clients[sock].respond_to?(:upon_unbind)
              end
            end
          end
        end
      end
    end
  end
end

#running?Boolean

Returns:

  • (Boolean)


106
107
108
# File 'bin/httphere', line 106

def running?
  Thread.current[:status] == :running
end

#shutdown_listeners!Object



242
243
244
245
# File 'bin/httphere', line 242

def shutdown_listeners!
  # puts "Shutting down #{listen_sockets.length} listeners..."
  listen_sockets.each { |socket| socket.close rescue nil }
end

#stop!Object



109
110
111
# File 'bin/httphere', line 109

def stop!
  Thread.current[:status] = :stop
end