Class: AGIServer
- Inherits:
-
Object
- Object
- AGIServer
- Defined in:
- lib/AGIServer.rb
Overview
AGIServer is a threaded server framework that is intended to be used to communicate with an Asterisk PBX via the Asterisk Gateway Interface, an interface for adding functionality to asterisk. This class implements a server object which will listen on a tcp host:port and accept connections, setup an AGI object, and either yield to a supplied block, which itself defines callflow, or route to public methods of the AGIRoute objects.
Constant Summary collapse
- @@servers =
A list of all current AGIServers
[]
Instance Attribute Summary collapse
-
#bind_host ⇒ Object
readonly
Binding Parameters supplied during initialization.
-
#bind_port ⇒ Object
readonly
Binding Parameters supplied during initialization.
Class Method Summary collapse
-
.shutdown ⇒ Object
Calls shutdown on all AGIServer objects.
Instance Method Summary collapse
-
#initialize(params = {}) ⇒ AGIServer
constructor
Creates an AGIServer Object based on the provided Parameter Hash, and binds to the appropriate host/port.
-
#join ⇒ Object
(also: #finish)
Will wait for the Monitor and Listener threads to join.
-
#run(&block) ⇒ Object
(also: #start)
call-seq: run() run() { |agi| block } run() { |agi,params| block }.
-
#shutdown ⇒ Object
Closes the listener socket, so that no new requests will be accepted.
-
#stop ⇒ Object
Closes the listener socket, so that no new requests will be accepted.
Constructor Details
#initialize(params = {}) ⇒ AGIServer
Creates an AGIServer Object based on the provided Parameter Hash, and binds to the appropriate host/port. Will also set signal handlers that will shut down all AGIServer’s upon receipt of SIGINT or SIGTERM.
-
:bind_host sets the hostname or ip address to bind to. Defaults to localhost.
-
:bind_port sets the port to bind to. Defaults to 4573.
-
:max_workers sets the maximum number of worker threads to allow for connection processing. Defaults to 10
-
:min_workers sets the minimum number of worker threads to maintain for connection processing. Defaults to 5
-
:jobs_per_worker sets the number of connections each worker will handle before exiting. Defaults to 50
-
:logger sets the Logger object to use for logging. Defaults to Logger.new(STDERR).
-
:params can be any object you wish to be made available to all workers; I suggest a hash of objects.
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/AGIServer.rb', line 127 def initialize(params={}) #Options @bind_host = params[:bind_host] || 'localhost' @bind_port = params[:bind_port] || 4573 @max_workers = params[:max_workers] || 10 @min_workers = params[:min_workers] || 5 @jobs_per_worker = params[:jobs_per_worker] || 50 @logger = params[:logger] || Logger.new(STDERR) @stats = params[:stats] || false @params = params[:params] || Hash.new #Threads @listener = nil @monitor = nil @workers = [] #Synchronization @worker_queue = Queue.new @shutdown = false #Initial Bind begin @listen_socket = TCPServer.new(@bind_host, @bind_port) rescue Errno::EADDRINUSE @logger.fatal("AGIServer cannot bind to #{@bind_host}:#{@bind_port}, Address already in use.") raise end #Track for signal handling @@servers << self AGIRouter.logger(@logger) trap('INT') { shutdown } trap('TERM') { shutdown } end |
Instance Attribute Details
#bind_host ⇒ Object (readonly)
Binding Parameters supplied during initialization.
118 119 120 |
# File 'lib/AGIServer.rb', line 118 def bind_host @bind_host end |
#bind_port ⇒ Object (readonly)
Binding Parameters supplied during initialization.
118 119 120 |
# File 'lib/AGIServer.rb', line 118 def bind_port @bind_port end |
Class Method Details
.shutdown ⇒ Object
Calls shutdown on all AGIServer objects.
261 262 263 |
# File 'lib/AGIServer.rb', line 261 def AGIServer.shutdown @@servers.each { |server| server.shutdown } end |
Instance Method Details
#join ⇒ Object Also known as: finish
Will wait for the Monitor and Listener threads to join. The Monitor thread itself will wait for all of it’s instantiated Worker threads to join.
246 247 248 249 |
# File 'lib/AGIServer.rb', line 246 def join @listener.join && @logger.debug{"AGIServer Listener Thread closed"} @monitor.join && @logger.debug{"AGIServer Monitor Thread closed"} end |
#run(&block) ⇒ Object Also known as: start
call-seq: run() run() { |agi| block } run() { |agi,params| block }
Starts the server to run. If a block is provided, the block will be run by all workers to handle connections. If a block is not provided, will attempt to route calls to public methods of AGIRoute objects.
-
Listener Thread: The Listener Thread is the simplest of the Threads. It accepts client sockets from the main socket, and enqueues those client sockets into the worker_queue.
-
Worker Threads: The Worker Thread is also fairly simple. It loops jobs_per_worker times, and each time, dequeues from the worker_queue. If the result is nil, it exits, otherwise, it interacts with the client socket, either yielding to the aforementioned supplied block or routing to the AGIRoutes. If a Worker Thread is instantiated, it will continue to process requests until it processes jobs_per_worker jobs or the server is stopped.
-
Monitor Thread: The Monitor Thread is the most complex of the threads at use. It instantiates Worker Threads if at any time it detects that there are fewer workers than min_workers, and if at any time it detects that the worker_queue length is greater than zero while there are fewer than max_workers.
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/AGIServer.rb', line 172 def run(&block) @logger.info{"AGIServer Initializing Monitor Thread"} @monitor = Thread.new do poll = 0 while ! @shutdown do poll += 1 if (@workers.length < @max_workers and @worker_queue.length > 0) or ( @workers.length < @min_workers ) then @logger.info{"AGIServer Starting Worker Thread to handle requests"} #Begin Worker Thread worker_thread = Thread.new do @jobs_per_worker.times do client = @worker_queue.deq break if client.nil? @logger.debug{"AGIServer Worker received Connection"} agi = AGI.new({ :input => client, :output => client, :logger => @logger }) begin agi.init params = @params if block.nil? router = AGIRouter.new(agi.channel_params['request']) router.route(agi, params) else if block.arity == 2 yield(agi, params) elsif block.arity == 1 yield(agi) end end rescue AGIHangupError => error @logger.error{"AGIServer Worker Caught Unhandled Hangup: #{error}"} rescue AGIError => error @logger.error{"AGIServer Worker Caught Unhandled Exception: #{error.class} #{error.to_s}"} rescue Exception => error @logger.error{"AGIServer Worker Got Unhandled Exception: #{error.class} #{error}"} ensure client.close @logger.debug{"AGIServer Worker done with Connection"} end end @workers.delete(Thread.current) @logger.info{"AGIServer Worker handled last Connection, terminating"} end #End Worker Thread @workers << worker_thread next #Short Circuit back without a sleep in case we need more threads for load end if @stats and poll % 10 == 0 then @logger.debug{"AGIServer #{@workers.length} active workers, #{@worker_queue.length} jobs waiting"} end sleep 1 end @logger.debug{"AGIServer Signaling all Worker Threads to finish up and exit"} @workers.length.times{ @worker_queue.enq(nil) } @workers.each { |worker| worker.join } @logger.debug{"AGIServer Final Worker Thread closed"} end @logger.info{"AGIServer Initializing Listener Thread"} @listener = Thread.new do begin while( client = @listen_socket.accept ) @logger.debug{"AGIServer Listener received Connection Request"} @worker_queue.enq(client) end rescue IOError # Occurs on socket shutdown. end end end |
#shutdown ⇒ Object
Closes the listener socket, so that no new requests will be accepted. Signals to the Monitor thread to shutdown it’s Workers when they’re done with their current clients.
253 254 255 256 257 |
# File 'lib/AGIServer.rb', line 253 def shutdown @logger.info{"AGIServer Shutting down gracefully"} @listen_socket.close && @logger.info{"AGIServer No longer accepting connections"} @shutdown = true && @logger.info{"AGIServer Signaling Monitor to close after active sessions complete"} end |
#stop ⇒ Object
Closes the listener socket, so that no new requests will be accepted. Signals to the Monitor thread to shutdown it’s Workers when they’re done with their current clients.
258 259 260 261 262 |
# File 'lib/AGIServer.rb', line 258 def shutdown @logger.info{"AGIServer Shutting down gracefully"} @listen_socket.close && @logger.info{"AGIServer No longer accepting connections"} @shutdown = true && @logger.info{"AGIServer Signaling Monitor to close after active sessions complete"} end |