Class: Arachni::RPC::Server::Dispatcher
- Includes:
- UI::Output, Utilities
- Defined in:
- lib/arachni/rpc/server/dispatcher.rb
Overview
Dispatches RPC Instances on demand providing a centralized environment for multiple clients and allows for extensive process monitoring.
The process goes something like this:
- On initialization the Dispatcher populates the Instance pool.
- A client issues a #dispatch call.
- The Dispatcher pops an Instance from the pool
- Asynchronously replenishes the pool
- Gives the Instance credentials to the client (url, auth token, etc.)
- The client connects to the Instance using these credentials.
Once the client finishes using the RPC Instance he must shut it down otherwise the system will be eaten away by zombie RPC Instance processes.
Defined Under Namespace
Constant Summary collapse
- SERVICE_NAMESPACE =
Service
Instance Method Summary collapse
-
#alive? ⇒ TrueClass
True.
-
#dispatch(owner = 'unknown', helpers = {}, load_balance = true, &block) ⇒ Hash, ...
Dispatches an Instance from the pool.
-
#finished_jobs ⇒ Array<Hash>
Returns info for all finished jobs.
-
#initialize(options = Options.instance) ⇒ Dispatcher
constructor
A new instance of Dispatcher.
-
#job(pid) ⇒ Hash
Returns proc info for a given pid.
-
#jobs ⇒ Array<Hash>
Returns info for all jobs.
-
#log ⇒ String
Contents of the log file.
-
#preferred(&block) ⇒ String
URL of the least burdened Dispatcher.
-
#running_jobs ⇒ Array<Hash>
Returns info for all running jobs.
- #services ⇒ Object
-
#statistics ⇒ Hash
Returns server stats regarding the jobs and pool.
-
#workload_score ⇒ Float
Workload score for this Dispatcher, calculated using the number of #running_jobs and the configured node weight.
Methods included from UI::Output
#caller_location, #debug?, #debug_level, #debug_level_1?, #debug_level_2?, #debug_level_3?, #debug_level_4?, #debug_off, #debug_on, #disable_only_positives, #error_buffer, #error_log_fd, #error_logfile, #has_error_log?, #included, #log_error, #mute, #muted?, #only_positives, #only_positives?, #print_bad, #print_debug, #print_debug_backtrace, #print_debug_exception, #print_debug_level_1, #print_debug_level_2, #print_debug_level_3, #print_debug_level_4, #print_error, #print_error_backtrace, #print_exception, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?, reset_output_options, #set_error_logfile, #unmute, #verbose?, #verbose_off, #verbose_on
Methods included from Utilities
#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #cookie_decode, #cookie_encode, #cookies_from_file, #cookies_from_parser, #cookies_from_response, #exception_jail, #exclude_path?, #follow_protocol?, #form_decode, #form_encode, #forms_from_parser, #forms_from_response, #full_and_absolute_url?, #generate_token, #get_path, #hms_to_seconds, #html_decode, #html_encode, #include_path?, #links_from_parser, #links_from_response, #normalize_url, #page_from_response, #page_from_url, #parse_set_cookie, #path_in_domain?, #path_too_deep?, #port_available?, #rand_port, #random_seed, #redundant_path?, #regexp_array_match, #remove_constants, #request_parse_body, #seconds_to_hms, #skip_page?, #skip_path?, #skip_resource?, #skip_response?, #to_absolute, #uri_decode, #uri_encode, #uri_parse, #uri_parse_query, #uri_parser, #uri_rewrite
Constructor Details
#initialize(options = Options.instance) ⇒ Dispatcher
Returns a new instance of Dispatcher.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 46 def initialize( = Options.instance ) @options = @options.dispatcher.external_address ||= @options.rpc.server_address @options.snapshot.save_path ||= @options.paths.snapshots @server = Base.new( @options ) @server.logger.level = @options.datastore.log_level if @options.datastore.log_level @server.add_async_check do |method| # methods that expect a block are async method.parameters.flatten.include? :block end @url = "#{@options.dispatcher.external_address}:#{@options.rpc.server_port}" # let the instances in the pool know who to ask for routing instructions # when we're in grid mode. @options.datastore.dispatcher_url = @url prep_logging print_status 'Starting the RPC Server...' @server.add_handler( 'dispatcher', self ) # trap interrupts and exit cleanly when required trap_interrupts { shutdown } @jobs = [] @consumed_pids = [] @pool = Reactor.global.create_queue print_status "Populating the pool with #{@options.dispatcher.pool_size} Instances." if @options.dispatcher.pool_size > 0 @options.dispatcher.pool_size.times { add_instance_to_pool( false ) } end print_status 'Waiting for Instances to come on-line.' # Check up on the pool and start the server once it has been filled. Reactor.global.at_interval( 0.1 ) do |task| print_debug "Instances: #{@pool.size}/#{@options.dispatcher.pool_size}" next if @options.dispatcher.pool_size != @pool.size task.done print_status 'Instances are on-line.' _services.each do |name, service| @server.add_handler( name, service.new( @options, self ) ) end @node = Node.new( @options, @logfile ) @server.add_handler( 'node', @node ) run end end |
Instance Method Details
#alive? ⇒ TrueClass
Returns true.
111 112 113 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 111 def alive? @server.alive? end |
#dispatch(owner = 'unknown', helpers = {}, load_balance = true, &block) ⇒ Hash, ...
Dispatches an Instance from the pool.
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 155 def dispatch( owner = 'unknown', helpers = {}, load_balance = true, &block ) if load_balance && @node.grid_member? preferred do |url| connect_to_peer( url ).dispatch( owner, helpers, false, &block ) end return end if @options.dispatcher.pool_size <= 0 block.call nil return end if @pool.empty? block.call false else @pool.pop do |cjob| cjob['owner'] = owner.to_s cjob['starttime'] = Time.now.to_s cjob['helpers'] = helpers print_status "Instance dispatched -- PID: #{cjob['pid']} - " + "Port: #{cjob['port']} - Owner: #{cjob['owner']}" @jobs << cjob block.call cjob end end Reactor.global.schedule { add_instance_to_pool } end |
#finished_jobs ⇒ Array<Hash>
Returns info for all finished jobs.
226 227 228 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 226 def finished_jobs jobs.reject { |job| job['alive'] } end |
#job(pid) ⇒ Hash
Returns proc info for a given pid
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 192 def job( pid ) @jobs.each do |j| next if j['pid'] != pid cjob = j.dup currtime = Time.now cjob['currtime'] = currtime.to_s cjob['age'] = currtime - Time.parse( cjob['birthdate'] ) cjob['runtime'] = currtime - Time.parse( cjob['starttime'] ) cjob['alive'] = Arachni::Processes::Manager.alive?( pid ) return cjob end end |
#jobs ⇒ Array<Hash>
Returns info for all jobs.
210 211 212 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 210 def jobs @jobs.map { |cjob| job( cjob['pid'] ) }.compact end |
#log ⇒ String
Returns Contents of the log file.
261 262 263 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 261 def log IO.read prep_logging end |
#preferred(&block) ⇒ String
URL of the least burdened Dispatcher. If not a grid member it will return this Dispatcher's URL.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 118 def preferred( &block ) if !@node.grid_member? block.call @url return end each = proc do |neighbour, iter| connect_to_peer( neighbour ).workload_score do |score| iter.return (!score || score.rpc_exception?) ? nil : [neighbour, score] end end after = proc do |nodes| nodes.compact! nodes << [@url, workload_score] block.call nodes.sort_by { |_, score| score }[0][0] end Reactor.global.create_iterator( @node.neighbours ).map( each, after ) end |
#running_jobs ⇒ Array<Hash>
Returns info for all running jobs.
218 219 220 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 218 def running_jobs jobs.select { |job| job['alive'] } end |
#services ⇒ Object
105 106 107 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 105 def services _services.keys end |
#statistics ⇒ Hash
Returns server stats regarding the jobs and pool.
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 243 def statistics stats_h = { 'running_jobs' => running_jobs, 'finished_jobs' => finished_jobs, 'init_pool_size' => @options.dispatcher.pool_size, 'curr_pool_size' => @pool.size, 'consumed_pids' => @consumed_pids, 'snapshots' => Dir.glob( "#{@options.snapshot.save_path}*.afs" ) } stats_h.merge!( 'node' => @node.info, 'neighbours' => @node.neighbours ) stats_h['node']['score'] = workload_score stats_h end |
#workload_score ⇒ Float
Returns Workload score for this Dispatcher, calculated using the number of #running_jobs and the configured node weight.
Lower is better.
235 236 237 238 239 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 235 def workload_score score = (running_jobs.size + 1).to_f score *= @node.info['weight'].to_f if @node.info['weight'] score end |