Class: Leech::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/leech/server.rb

Overview

This is simple TCP server similar to rack, but designed for asynchronously handling a short text commands. It can be used for some monitoring purposes or simple communication between few machines.

### Creating server instance

Server can be created in two ways:

server = Leech::Server.new :port => 666
server.use :auth
server.run

…or using block style:

Leech::Server.new do 
  port 666
  use :auth
  run
end

### Complete server example

Simple server with authorization and few commands handling can be configured such like this one:

server = Leech::Server.new do 
  # simple authorization handler, see Leech::AuthHandler for 
  # more informations. 
  use :auth 

  host 'localhost'
  port 666
  max_workers 100
  timeout 30
  logger Logger.new('/var/logs/leech/server.log')

  handle /^SHOW FILE (.*)$/ do |env,params| 
    if File.exists?(param[1])
      answer(File.open(params[0]).read)
    else
      answer('NOT FOUND')
    end
  end
  handle /^DELETE FILE (.*)$/ do |env,params| 
    answer(File.delete(params[1]))
  end

  run
end

# Now we have to join server thread with main thread. 
server.join

Defined Under Namespace

Classes: Error, StopServer, TimeoutError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}, &block) ⇒ Server

Creates a working server on host:port. Use #run to start the server and ‘acceptor.join` to join the thread that’s processing incoming requests on the socket.

Parameters:

  • opts (Hash) (defaults to: {})

    see #options

Options Hash (opts):

  • :host (String) — default: 'localhost'

    see #host

  • :port (Int) — default: 9933

    see #port

  • :logger (Logger) — default: Logger.new(STDOUT)

    see #logger

  • :max_workers (Int) — default: 100

    see #max_workers

  • :timeout (Int, Float) — default: 30

    see #timeout

  • :throttle (Int, Float) — default: 0

    see #throttle

See Also:



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/leech/server.rb', line 137

def initialize(opts={}, &block)
  @handlers    = []
  @workers     = ThreadGroup.new
  @acceptor    = nil
  @mutex       = Mutex.new
  
  @options     = opts
  @host        = opts[:host] || 'localhost'
  @port        = opts[:port] || 9933
  @logger      = opts[:logger] || Logger.new(STDOUT)
  @max_workers = opts[:max_workers] || 100
  @timeout     = opts[:timeout] || 30
  @throttle    = opts[:throttle].to_i / 100.0
  
  @inline_handler = Class.new(Leech::Handler)
  instance_eval(&block) if block_given? 
end

Instance Attribute Details

#acceptorThread (readonly)

Server main thread

Returns:

  • (Thread)


70
71
72
# File 'lib/leech/server.rb', line 70

def acceptor
  @acceptor
end

#hostString (readonly)

Server will bind to this host

Returns:

  • (String)


80
81
82
# File 'lib/leech/server.rb', line 80

def host
  @host
end

#loggerLogger (readonly)

Logging object

Returns:

  • (Logger)


90
91
92
# File 'lib/leech/server.rb', line 90

def logger
  @logger
end

#max_workersInt, Float (readonly)

The maximum number of concurrent processors to accept, anything over this is closed immediately to maintain server processing performance.

This may seem mean but it is the most efficient way to deal with overload.

Other schemes involve still parsing the client’s request wchich defeats the point of an overload handling system.

Returns:

  • (Int, Float)


99
100
101
# File 'lib/leech/server.rb', line 99

def max_workers
  @max_workers
end

#optionsHash (readonly)

Server configuration

Returns:

  • (Hash)


75
76
77
# File 'lib/leech/server.rb', line 75

def options
  @options
end

#portInt (readonly)

Server will be listening on this port

Returns:

  • (Int)


85
86
87
# File 'lib/leech/server.rb', line 85

def port
  @port
end

#throttleInt, Float (readonly)

A sleep timeout (in hundredths of a second) that is placed between socket.accept calls in order to give the server a cheap throttle time.

It defaults to 0 and actually if it is 0 then the sleep is not done at all.

Returns:

  • (Int, Float)


112
113
114
# File 'lib/leech/server.rb', line 112

def throttle
  @throttle
end

#timeoutInt, Float (readonly)

Maximum idle time

Returns:

  • (Int, Float)


104
105
106
# File 'lib/leech/server.rb', line 104

def timeout
  @timeout
end

Instance Method Details

#answer(msg) ⇒ Object Also known as: say

Sends answer to current connected socket. Method should be called only in worker thread.

Parameters:

  • msg (String)

    Text to send



339
340
341
342
# File 'lib/leech/server.rb', line 339

def answer(msg)
  logger.debug("Answering to (#{info[:uri]}): #{msg.chomp.strip}")
  client.puts(msg)
end

#graceful_shutdownObject

Performs a wait on all the currently running threads and kills any that take too long. It waits by ‘@timeout seconds`, which can be set in `#initialize`. The `@throttle` setting does extend this waiting period by that much longer.



237
238
239
240
241
242
# File 'lib/leech/server.rb', line 237

def graceful_shutdown
  while reap_dead_workers("shutdown") > 0
    logger.error "Waiting for #{@workers.list.length} requests to finish, could take #{@timeout + @throttle} seconds."
    sleep @timeout / 10
  end
end

#handle(pattern, &block) ⇒ Object

It defines matching pattern in @inline_handler. It is similar to ‘Leech::Handler#handle` but can be used only in block-style.

Examples:

Leech::Server.new do 
  handle(/^PRINT) (.*)$/ {|env,params| print params[0]}
  handle(/^DELETE FILE (.*)$/) {|env,params| File.delete(params[0])}
end

Parameters:

  • patern (Regexp)

    Block will be executed for passed command will be maching it.

See Also:

  • Handler#handle


204
205
206
# File 'lib/leech/server.rb', line 204

def handle(pattern, &block)
  @inline_handler.handle(pattern, &block)
end

#joinObject

Port to acceptor thread #join method.

See Also:

  • Thread#join


158
159
160
# File 'lib/leech/server.rb', line 158

def join
  @acceptor.join if @acceptor
end

#process_client(c) ⇒ Object

It is getting information about client connection and starts conversation with him. Received commands are passed to declared handlers, where will be processed.

Parameters:

  • c (TCPSocket)

    Client socket



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/leech/server.rb', line 313

def process_client(c)
  Thread.current[:client] = c
  Thread.current[:info] = {
    :port => client.peeraddr[1],
    :host => client.peeraddr[2],
    :addr => client.peeraddr[3],
    }
  info[:uri] = [info[:host], info[:port]].join(':')
  logger.debug "Processing client from #{info[:uri]}"
  while line = client.gets
    line = line.chomp.strip
    logger.info "Dispatching command (#{info[:uri]}): #{line}"
    @handlers.each do |handler|
      if handler = handler.new(self).match(line.chomp.strip)
        handler.call
        next
      end
    end
  end
end

#reap_dead_workers(reason = 'unknown') ⇒ Array<Thread>

Used internally to kill off any worker threads that have taken too long to complete processing. Only called if there are too many processors currently servicing. It returns the count of workers still active after the reap is done. It only runs if there are workers to reap.

Parameters:

  • reason (String) (defaults to: 'unknown')

    Reason why method was executed

Returns:

  • (Array<Thread>)

    List of still active workers threads.



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/leech/server.rb', line 218

def reap_dead_workers(reason='unknown')
  if @workers.list.length > 0
    logger.error  "#{Time.now}: Reaping #{@workers.list.length} threads for slow workers because of '#{reason}'"
    error_msg = "Leech timed out this thread: #{reason}"
    mark = Time.now
    @workers.list.each do |worker|
      worker[:started_on] = Time.now if not worker[:started_on]
      if mark - worker[:started_on] > @timeout + @throttle
        logger.error "Thread #{worker.inspect} is too old, killing."
        worker.raise(TimeoutError.new(error_msg))
      end
    end
  end
  return @workers.list.length
end

#runThread

Starts serving TCP listener on host and port declared in options. It returns the thread used so you can join it. Each client connection will be processed in separated thread.

Returns:

  • (Thread)

    Main thread for this server instance.



262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/leech/server.rb', line 262

def run
  use(@inline_handler)
  @socket = TCPServer.new(@host, @port)
  @handlers = @handlers.uniq.freeze
  @options = @options.freeze
  @acceptor = Thread.new do 
    begin
      logger.debug "Starting leech server on tcp://#{@host}:#{@port}"
      loop do 
        begin
          client = @socket.accept 
          worker_list = @workers.list
          
          if worker_list.length >= @max_workers
            logger.error "Server overloaded with #{worker_list.length} workers (#@max_workers max). Dropping connection."
            client.close rescue nil
            reap_dead_workers("max processors")
          else
            thread = Thread.new(client) {|c| process_client(c) }
            thread[:started_on] = Time.now
            @workers.add(thread)
            sleep @throttle if @throttle > 0
          end
        rescue StopServer
          break
        rescue Errno::EMFILE
          reap_dead_workers("too many open files")
          sleep 0.5
        rescue Errno::ECONNABORTED
          client.close rescue nil
        rescue Object => e
          logger.error  "#{Time.now}: Unhandled listen loop exception #{e.inspect}."
          logger.error  e.backtrace.join("\n")
        end
      end
      graceful_shutdown
    ensure
      @socket.close  
      logger.debug "Closing leech server on tcp://#{@host}:#{@port}"
    end
  end
  
  return @acceptor
end

#running?Boolean

Returns Actual server state. Returns ‘true` server acceptor thread is alive.

Returns:

  • (Boolean)

    Actual server state. Returns ‘true` server acceptor thread is alive.



347
348
349
# File 'lib/leech/server.rb', line 347

def running?
  @acceptor && @acceptor.alive?
end

#stopObject

Stops the acceptor thread and then causes the worker threads to finish off the request queue before finally exiting. It’s also reseting freezed settings.



247
248
249
250
251
252
253
254
# File 'lib/leech/server.rb', line 247

def stop
  if running?
    @acceptor.raise(StopServer.new)
    @handlers = Array.new(@handlers)
    @options = Hash.new(@options)
    sleep(0.5) while @acceptor.alive?
  end
end

#use(handler) ⇒ Object

It registers specified handler for using in this server instance. Handlers are extending functionality of server by defining custom command handling callbacks or server instance methods.

Parameters:

Raises:

See Also:



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/leech/server.rb', line 173

def use(handler)
  case handler
  when Class
    @handlers << handler
    handler.used(self)
  when Symbol, String
    begin
      require "leech/handlers/#{handler.to_s}"
      klass = handler.to_s.gsub(/\/(.?)/) { "::#{$1.upcase}" }.gsub(/(?:^|_)(.)/) { $1.upcase }
      use(eval("Leech::Handlers::#{klass}"))
    rescue LoadError
      raise Error, "Could not find #{handler} handler"
    end
  else
    raise Leech::Handler::Error, "Invalid handler #{handler}"
  end
end