Class: AGIServer

Inherits:
Object
  • Object
show all
Defined in:
lib/AGIServer.rb

Overview

AGIServer is a threaded server framework that is intended to be used to communicate with an Asterisk PBX via the Asterisk Gateway Interface, an interface for adding functionality to asterisk. This class implements a server object which will listen on a tcp host:port and accept connections, setup an AGI object, and either yield to a supplied block, which itself defines callflow, or route to public methods of the AGIRoute objects.

Constant Summary collapse

@@servers =

A list of all current AGIServers

[]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ AGIServer

Creates an AGIServer Object based on the provided Parameter Hash, and binds to the appropriate host/port. Will also set signal handlers that will shut down all AGIServer’s upon receipt of SIGINT or SIGTERM.

  • :bind_host sets the hostname or ip address to bind to. Defaults to localhost.

  • :bind_port sets the port to bind to. Defaults to 4573.

  • :max_workers sets the maximum number of worker threads to allow for connection processing. Defaults to 10

  • :min_workers sets the minimum number of worker threads to maintain for connection processing. Defaults to 5

  • :jobs_per_worker sets the number of connections each worker will handle before exiting. Defaults to 50

  • :logger sets the Logger object to use for logging. Defaults to Logger.new(STDERR).

  • :params can be any object you wish to be made available to all workers; I suggest a hash of objects.



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/AGIServer.rb', line 127

def initialize(params={})
  #Options
  @bind_host        = params[:bind_host]        || 'localhost'
  @bind_port        = params[:bind_port]        || 4573
  @max_workers      = params[:max_workers]      || 10
  @min_workers      = params[:min_workers]      || 5
  @jobs_per_worker  = params[:jobs_per_worker]  || 50
  @logger           = params[:logger]           || Logger.new(STDERR)
  @stats            = params[:stats]            || false
  @params           = params[:params]           || Hash.new

  #Threads
  @listener         = nil
  @monitor          = nil
  @workers          = []
  
  #Synchronization
  @worker_queue     = Queue.new
  @shutdown         = false

  #Initial Bind
  begin
    @listen_socket  = TCPServer.new(@bind_host, @bind_port)
  rescue Errno::EADDRINUSE
    @logger.fatal("AGIServer cannot bind to #{@bind_host}:#{@bind_port}, Address already in use.")
    raise    
  end
  
  #Track for signal handling
  @@servers << self
  AGIRouter.logger(@logger)
  
  trap('INT')   { shutdown }
  trap('TERM')  { shutdown }
end

Instance Attribute Details

#bind_hostObject (readonly)

Binding Parameters supplied during initialization.



118
119
120
# File 'lib/AGIServer.rb', line 118

def bind_host
  @bind_host
end

#bind_portObject (readonly)

Binding Parameters supplied during initialization.



118
119
120
# File 'lib/AGIServer.rb', line 118

def bind_port
  @bind_port
end

Class Method Details

.shutdownObject

Calls shutdown on all AGIServer objects.



261
262
263
# File 'lib/AGIServer.rb', line 261

def AGIServer.shutdown
  @@servers.each { |server| server.shutdown }
end

Instance Method Details

#joinObject Also known as: finish

Will wait for the Monitor and Listener threads to join. The Monitor thread itself will wait for all of it’s instantiated Worker threads to join.



246
247
248
249
# File 'lib/AGIServer.rb', line 246

def join
  @listener.join && @logger.debug{"AGIServer Listener Thread closed"}
  @monitor.join && @logger.debug{"AGIServer Monitor Thread closed"}
end

#run(&block) ⇒ Object Also known as: start

call-seq: run() run() { |agi| block } run() { |agi,params| block }

Starts the server to run. If a block is provided, the block will be run by all workers to handle connections. If a block is not provided, will attempt to route calls to public methods of AGIRoute objects.

  1. Listener Thread: The Listener Thread is the simplest of the Threads. It accepts client sockets from the main socket, and enqueues those client sockets into the worker_queue.

  2. Worker Threads: The Worker Thread is also fairly simple. It loops jobs_per_worker times, and each time, dequeues from the worker_queue. If the result is nil, it exits, otherwise, it interacts with the client socket, either yielding to the aforementioned supplied block or routing to the AGIRoutes. If a Worker Thread is instantiated, it will continue to process requests until it processes jobs_per_worker jobs or the server is stopped.

  3. Monitor Thread: The Monitor Thread is the most complex of the threads at use. It instantiates Worker Threads if at any time it detects that there are fewer workers than min_workers, and if at any time it detects that the worker_queue length is greater than zero while there are fewer than max_workers.



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/AGIServer.rb', line 172

def run(&block)
  @logger.info{"AGIServer Initializing Monitor Thread"}
  @monitor = Thread.new do
    poll = 0
    while ! @shutdown do
      poll += 1
      if (@workers.length < @max_workers and @worker_queue.length > 0) or ( @workers.length < @min_workers ) then
        @logger.info{"AGIServer Starting Worker Thread to handle requests"}

        #Begin Worker Thread
        worker_thread = Thread.new do
          @jobs_per_worker.times do
            client = @worker_queue.deq
            break if client.nil?
            @logger.debug{"AGIServer Worker received Connection"}
            agi = AGI.new({ :input => client, :output => client, :logger => @logger })
            begin
              agi.init
              params = @params
              if block.nil?
                router = AGIRouter.new(agi.channel_params['request'])
                router.route(agi, params)
              else
                if block.arity == 2
                  yield(agi, params)                    
                elsif block.arity == 1
                  yield(agi)
                end
              end
            rescue AGIHangupError => error
              @logger.error{"AGIServer Worker Caught Unhandled Hangup: #{error}"}
            rescue AGIError => error
              @logger.error{"AGIServer Worker Caught Unhandled Exception: #{error.class} #{error.to_s}"}
            rescue Exception => error
              @logger.error{"AGIServer Worker Got Unhandled Exception: #{error.class} #{error}"}
            ensure
              client.close
              @logger.debug{"AGIServer Worker done with Connection"}
            end
          end
          @workers.delete(Thread.current)
          @logger.info{"AGIServer Worker handled last Connection, terminating"}
        end
        #End Worker Thread
        
        @workers << worker_thread
        next #Short Circuit back without a sleep in case we need more threads for load
      end
      if @stats and poll % 10 == 0 then
        @logger.debug{"AGIServer #{@workers.length} active workers, #{@worker_queue.length} jobs waiting"} 
      end
      sleep 1
    end
    @logger.debug{"AGIServer Signaling all Worker Threads to finish up and exit"}
    @workers.length.times{ @worker_queue.enq(nil) }
    @workers.each { |worker| worker.join }
    @logger.debug{"AGIServer Final Worker Thread closed"}
  end

  @logger.info{"AGIServer Initializing Listener Thread"}
  @listener = Thread.new do
    begin
      while( client = @listen_socket.accept )
        @logger.debug{"AGIServer Listener received Connection Request"}
        @worker_queue.enq(client)
      end
    rescue IOError
      # Occurs on socket shutdown.
    end
  end
end

#shutdownObject

Closes the listener socket, so that no new requests will be accepted. Signals to the Monitor thread to shutdown it’s Workers when they’re done with their current clients.



253
254
255
256
257
# File 'lib/AGIServer.rb', line 253

def shutdown
  @logger.info{"AGIServer Shutting down gracefully"}
  @listen_socket.close && @logger.info{"AGIServer No longer accepting connections"}
  @shutdown = true && @logger.info{"AGIServer Signaling Monitor to close after active sessions complete"}
end

#stopObject

Closes the listener socket, so that no new requests will be accepted. Signals to the Monitor thread to shutdown it’s Workers when they’re done with their current clients.



258
259
260
261
262
# File 'lib/AGIServer.rb', line 258

def shutdown
  @logger.info{"AGIServer Shutting down gracefully"}
  @listen_socket.close && @logger.info{"AGIServer No longer accepting connections"}
  @shutdown = true && @logger.info{"AGIServer Signaling Monitor to close after active sessions complete"}
end