Class: Leech::Server
- Inherits:
-
Object
- Object
- Leech::Server
- Defined in:
- lib/leech/server.rb
Overview
This is simple TCP server similar to rack, but designed for asynchronously handling a short text commands. It can be used for some monitoring purposes or simple communication between few machines.
### Creating server instance
Server can be created in two ways:
server = Leech::Server.new :port => 666
server.use :auth
server.run
…or using block style:
Leech::Server.new do
port 666
use :auth
run
end
### Complete server example
Simple server with authorization and few commands handling can be configured such like this one:
server = Leech::Server.new do
# simple authorization handler, see Leech::AuthHandler for
# more informations.
use :auth
host 'localhost'
port 666
max_workers 100
timeout 30
logger Logger.new('/var/logs/leech/server.log')
handle /^SHOW FILE (.*)$/ do |env,params|
if File.exists?(param[1])
answer(File.open(params[0]).read)
else
answer('NOT FOUND')
end
end
handle /^DELETE FILE (.*)$/ do |env,params|
answer(File.delete(params[1]))
end
run
end
# Now we have to join server thread with main thread.
server.join
Defined Under Namespace
Classes: Error, StopServer, TimeoutError
Instance Attribute Summary collapse
-
#acceptor ⇒ Thread
readonly
Server main thread.
-
#host ⇒ String
readonly
Server will bind to this host.
-
#logger ⇒ Logger
readonly
Logging object.
-
#max_workers ⇒ Int, Float
readonly
The maximum number of concurrent processors to accept, anything over this is closed immediately to maintain server processing performance.
-
#options ⇒ Hash
readonly
Server configuration.
-
#port ⇒ Int
readonly
Server will be listening on this port.
-
#throttle ⇒ Int, Float
readonly
A sleep timeout (in hundredths of a second) that is placed between socket.accept calls in order to give the server a cheap throttle time.
-
#timeout ⇒ Int, Float
readonly
Maximum idle time.
Instance Method Summary collapse
-
#answer(msg) ⇒ Object
(also: #say)
Sends answer to current connected socket.
-
#graceful_shutdown ⇒ Object
Performs a wait on all the currently running threads and kills any that take too long.
-
#handle(pattern, &block) ⇒ Object
It defines matching pattern in @inline_handler.
-
#initialize(opts = {}, &block) ⇒ Server
constructor
Creates a working server on host:port.
-
#join ⇒ Object
Port to acceptor thread #join method.
-
#process_client(c) ⇒ Object
It is getting information about client connection and starts conversation with him.
-
#reap_dead_workers(reason = 'unknown') ⇒ Array<Thread>
Used internally to kill off any worker threads that have taken too long to complete processing.
-
#run ⇒ Thread
Starts serving TCP listener on host and port declared in options.
-
#running? ⇒ Boolean
Actual server state.
-
#stop ⇒ Object
Stops the acceptor thread and then causes the worker threads to finish off the request queue before finally exiting.
-
#use(handler) ⇒ Object
It registers specified handler for using in this server instance.
Constructor Details
#initialize(opts = {}, &block) ⇒ Server
Creates a working server on host:port. Use #run to start the server and ‘acceptor.join` to join the thread that’s processing incoming requests on the socket.
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/leech/server.rb', line 137 def initialize(opts={}, &block) @handlers = [] @workers = ThreadGroup.new @acceptor = nil @mutex = Mutex.new @options = opts @host = opts[:host] || 'localhost' @port = opts[:port] || 9933 @logger = opts[:logger] || Logger.new(STDOUT) @max_workers = opts[:max_workers] || 100 @timeout = opts[:timeout] || 30 @throttle = opts[:throttle].to_i / 100.0 @inline_handler = Class.new(Leech::Handler) instance_eval(&block) if block_given? end |
Instance Attribute Details
#acceptor ⇒ Thread (readonly)
Server main thread
70 71 72 |
# File 'lib/leech/server.rb', line 70 def acceptor @acceptor end |
#host ⇒ String (readonly)
Server will bind to this host
80 81 82 |
# File 'lib/leech/server.rb', line 80 def host @host end |
#logger ⇒ Logger (readonly)
Logging object
90 91 92 |
# File 'lib/leech/server.rb', line 90 def logger @logger end |
#max_workers ⇒ Int, Float (readonly)
The maximum number of concurrent processors to accept, anything over this is closed immediately to maintain server processing performance.
This may seem mean but it is the most efficient way to deal with overload.
Other schemes involve still parsing the client’s request wchich defeats the point of an overload handling system.
99 100 101 |
# File 'lib/leech/server.rb', line 99 def max_workers @max_workers end |
#options ⇒ Hash (readonly)
Server configuration
75 76 77 |
# File 'lib/leech/server.rb', line 75 def @options end |
#port ⇒ Int (readonly)
Server will be listening on this port
85 86 87 |
# File 'lib/leech/server.rb', line 85 def port @port end |
#throttle ⇒ Int, Float (readonly)
A sleep timeout (in hundredths of a second) that is placed between socket.accept calls in order to give the server a cheap throttle time.
It defaults to 0 and actually if it is 0 then the sleep is not done at all.
112 113 114 |
# File 'lib/leech/server.rb', line 112 def throttle @throttle end |
#timeout ⇒ Int, Float (readonly)
Maximum idle time
104 105 106 |
# File 'lib/leech/server.rb', line 104 def timeout @timeout end |
Instance Method Details
#answer(msg) ⇒ Object Also known as: say
Sends answer to current connected socket. Method should be called only in worker thread.
339 340 341 342 |
# File 'lib/leech/server.rb', line 339 def answer(msg) logger.debug("Answering to (#{info[:uri]}): #{msg.chomp.strip}") client.puts(msg) end |
#graceful_shutdown ⇒ Object
Performs a wait on all the currently running threads and kills any that take too long. It waits by ‘@timeout seconds`, which can be set in `#initialize`. The `@throttle` setting does extend this waiting period by that much longer.
237 238 239 240 241 242 |
# File 'lib/leech/server.rb', line 237 def graceful_shutdown while reap_dead_workers("shutdown") > 0 logger.error "Waiting for #{@workers.list.length} requests to finish, could take #{@timeout + @throttle} seconds." sleep @timeout / 10 end end |
#handle(pattern, &block) ⇒ Object
It defines matching pattern in @inline_handler. It is similar to ‘Leech::Handler#handle` but can be used only in block-style.
204 205 206 |
# File 'lib/leech/server.rb', line 204 def handle(pattern, &block) @inline_handler.handle(pattern, &block) end |
#join ⇒ Object
Port to acceptor thread #join method.
158 159 160 |
# File 'lib/leech/server.rb', line 158 def join @acceptor.join if @acceptor end |
#process_client(c) ⇒ Object
It is getting information about client connection and starts conversation with him. Received commands are passed to declared handlers, where will be processed.
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
# File 'lib/leech/server.rb', line 313 def process_client(c) Thread.current[:client] = c Thread.current[:info] = { :port => client.peeraddr[1], :host => client.peeraddr[2], :addr => client.peeraddr[3], } info[:uri] = [info[:host], info[:port]].join(':') logger.debug "Processing client from #{info[:uri]}" while line = client.gets line = line.chomp.strip logger.info "Dispatching command (#{info[:uri]}): #{line}" @handlers.each do |handler| if handler = handler.new(self).match(line.chomp.strip) handler.call next end end end end |
#reap_dead_workers(reason = 'unknown') ⇒ Array<Thread>
Used internally to kill off any worker threads that have taken too long to complete processing. Only called if there are too many processors currently servicing. It returns the count of workers still active after the reap is done. It only runs if there are workers to reap.
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/leech/server.rb', line 218 def reap_dead_workers(reason='unknown') if @workers.list.length > 0 logger.error "#{Time.now}: Reaping #{@workers.list.length} threads for slow workers because of '#{reason}'" error_msg = "Leech timed out this thread: #{reason}" mark = Time.now @workers.list.each do |worker| worker[:started_on] = Time.now if not worker[:started_on] if mark - worker[:started_on] > @timeout + @throttle logger.error "Thread #{worker.inspect} is too old, killing." worker.raise(TimeoutError.new(error_msg)) end end end return @workers.list.length end |
#run ⇒ Thread
Starts serving TCP listener on host and port declared in options. It returns the thread used so you can join it. Each client connection will be processed in separated thread.
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/leech/server.rb', line 262 def run use(@inline_handler) @socket = TCPServer.new(@host, @port) @handlers = @handlers.uniq.freeze @options = @options.freeze @acceptor = Thread.new do begin logger.debug "Starting leech server on tcp://#{@host}:#{@port}" loop do begin client = @socket.accept worker_list = @workers.list if worker_list.length >= @max_workers logger.error "Server overloaded with #{worker_list.length} workers (#@max_workers max). Dropping connection." client.close rescue nil reap_dead_workers("max processors") else thread = Thread.new(client) {|c| process_client(c) } thread[:started_on] = Time.now @workers.add(thread) sleep @throttle if @throttle > 0 end rescue StopServer break rescue Errno::EMFILE reap_dead_workers("too many open files") sleep 0.5 rescue Errno::ECONNABORTED client.close rescue nil rescue Object => e logger.error "#{Time.now}: Unhandled listen loop exception #{e.inspect}." logger.error e.backtrace.join("\n") end end graceful_shutdown ensure @socket.close logger.debug "Closing leech server on tcp://#{@host}:#{@port}" end end return @acceptor end |
#running? ⇒ Boolean
Returns Actual server state. Returns ‘true` server acceptor thread is alive.
347 348 349 |
# File 'lib/leech/server.rb', line 347 def running? @acceptor && @acceptor.alive? end |
#stop ⇒ Object
Stops the acceptor thread and then causes the worker threads to finish off the request queue before finally exiting. It’s also reseting freezed settings.
247 248 249 250 251 252 253 254 |
# File 'lib/leech/server.rb', line 247 def stop if running? @acceptor.raise(StopServer.new) @handlers = Array.new(@handlers) @options = Hash.new(@options) sleep(0.5) while @acceptor.alive? end end |
#use(handler) ⇒ Object
It registers specified handler for using in this server instance. Handlers are extending functionality of server by defining custom command handling callbacks or server instance methods.
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/leech/server.rb', line 173 def use(handler) case handler when Class @handlers << handler handler.used(self) when Symbol, String begin require "leech/handlers/#{handler.to_s}" klass = handler.to_s.gsub(/\/(.?)/) { "::#{$1.upcase}" }.gsub(/(?:^|_)(.)/) { $1.upcase } use(eval("Leech::Handlers::#{klass}")) rescue LoadError raise Error, "Could not find #{handler} handler" end else raise Leech::Handler::Error, "Invalid handler #{handler}" end end |