Class: Arachni::RPC::Server::Dispatcher
- Includes:
- UI::Output, Utilities, Sys
- Defined in:
- lib/arachni/rpc/server/dispatcher.rb
Overview
Dispatches RPC Instances on demand providing a centralized environment for multiple clients and allows for extensive process monitoring.
The process goes something like this:
-
On initialization the Dispatcher populates the Instance pool.
-
A client issues a #dispatch call.
-
The Dispatcher pops an Instance from the pool
-
Asynchronously replenishes the pool
-
Gives the Instance credentials to the client (url, auth token, etc.)
-
-
The client connects to the Instance using these credentials.
Once the client finishes using the RPC Instance he must shut it down otherwise the system will be eaten away by zombie RPC Instance processes.
Defined Under Namespace
Constant Summary collapse
- HANDLER_NAMESPACE =
Handler
Instance Method Summary collapse
-
#alive? ⇒ TrueClass
True.
-
#dispatch(owner = 'unknown', helpers = {}, load_balance = true, &block) ⇒ Hash
Dispatches an Instance from the pool.
-
#finished_jobs ⇒ Array<Hash>
Returns proc info for all finished jobs.
- #handlers ⇒ Object
-
#initialize(opts = Options.instance) ⇒ Dispatcher
constructor
A new instance of Dispatcher.
-
#job(pid) ⇒ Hash
Returns proc info for a given pid.
-
#jobs ⇒ Array<Hash>
Returns proc info for all jobs.
-
#log ⇒ String
Contents of the log file.
-
#preferred(&block) ⇒ String
URL of the least burdened Dispatcher.
-
#proc_info ⇒ Hash
The server’s proc info.
-
#running_jobs ⇒ Array<Hash>
Returns proc info for all running jobs.
-
#stats ⇒ Hash
Returns server stats regarding the jobs and pool.
-
#workload_score ⇒ Float
Workload score for this Dispatcher, calculated using the number of #running_jobs and the configured node weight.
Methods included from UI::Output
#debug?, #debug_off, #debug_on, #disable_only_positives, #error_logfile, #flush_buffer, #log_error, #mute, #muted?, old_reset_output_options, #only_positives, #only_positives?, #print_bad, #print_debug, #print_debug_backtrace, #print_debug_pp, #print_error, #print_error_backtrace, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?, reset_output_options, #set_buffer_cap, #set_error_logfile, #uncap_buffer, #unmute, #verbose, #verbose?
Methods included from Utilities
#available_port, #cookie_encode, #cookies_from_document, #cookies_from_file, #cookies_from_response, #exception_jail, #exclude_path?, #extract_domain, #follow_protocol?, #form_decode, #form_encode, #form_parse_request_body, #forms_from_document, #forms_from_response, #generate_token, #get_path, #html_decode, #html_encode, #include_path?, #links_from_document, #links_from_response, #normalize_url, #page_from_response, #page_from_url, #parse_query, #parse_set_cookie, #parse_url_vars, #path_in_domain?, #path_too_deep?, #port_available?, #rand_port, #redundant_path?, #remove_constants, #seed, #skip_page?, #skip_path?, #skip_resource?, #to_absolute, #uri_decode, #uri_encode, #uri_parse, #uri_parser, #url_sanitize
Constructor Details
#initialize(opts = Options.instance) ⇒ Dispatcher
Returns a new instance of Dispatcher.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 58 def initialize( opts = Options.instance ) @opts = opts @opts.rpc_port ||= 7331 @opts.rpc_address ||= 'localhost' @opts.rpc_external_address ||= @opts.rpc_address @opts.pool_size ||= 5 if @opts.help print_help exit 0 end @server = Base.new( @opts ) @server.logger.level = @opts.datastore[:log_level] if @opts.datastore[:log_level] @server.add_async_check do |method| # methods that expect a block are async method.parameters.flatten.include? :block end @url = "#{@opts.rpc_external_address}:#{@opts.rpc_port.to_s}" # let the instances in the pool know who to ask for routing instructions # when we're in grid mode. @opts.datastore[:dispatcher_url] = @url.dup prep_logging print_status 'Initing RPC Server...' @server.add_handler( 'dispatcher', self ) # trap interrupts and exit cleanly when required trap_interrupts { shutdown } @jobs = [] @consumed_pids = [] @pool = ::EM::Queue.new if @opts.pool_size > 0 print_status 'Warming up the pool...' @opts.pool_size.times { add_instance_to_pool } end print_status 'Initialization complete.' @node = Node.new( @opts, @logfile ) @server.add_handler( 'node', @node ) _handlers.each do |name, handler| @server.add_handler( name, handler.new( @opts, self ) ) end run end |
Instance Method Details
#alive? ⇒ TrueClass
Returns true.
122 123 124 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 122 def alive? @server.alive? end |
#dispatch(owner = 'unknown', helpers = {}, load_balance = true, &block) ⇒ Hash
Dispatches an Instance from the pool.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 161 def dispatch( owner = 'unknown', helpers = {}, load_balance = true, &block ) if load_balance && @node.grid_member? preferred do |url| connect_to_peer( url ).dispatch( owner, helpers, false, &block ) end return end if @opts.pool_size <= 0 block.call false return end @pool.pop do |cjob| cjob['owner'] = owner.to_s cjob['starttime'] = Time.now cjob['helpers'] = helpers print_status "Instance dispatched -- PID: #{cjob['pid']} - " + "Port: #{cjob['port']} - Owner: #{cjob['owner']}" @jobs << cjob block.call cjob end ::EM.next_tick { add_instance_to_pool } end |
#finished_jobs ⇒ Array<Hash>
Returns proc info for all finished jobs.
229 230 231 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 229 def finished_jobs jobs.select { |job| job['proc'].empty? } end |
#handlers ⇒ Object
117 118 119 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 117 def handlers _handlers.keys end |
#job(pid) ⇒ Hash
Returns proc info for a given pid
196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 196 def job( pid ) @jobs.each do |j| next if j['pid'] != pid cjob = j.dup cjob['currtime'] = Time.now cjob['age'] = cjob['currtime'] - cjob['birthdate'] cjob['runtime'] = cjob['currtime'] - cjob['starttime'] cjob['proc'] = proc_hash( cjob['pid'] ) return cjob end end |
#jobs ⇒ Array<Hash>
Returns proc info for all jobs.
211 212 213 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 211 def jobs @jobs.map { |cjob| job( cjob['pid'] ) }.compact end |
#log ⇒ String
Returns contents of the log file.
263 264 265 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 263 def log IO.read prep_logging end |
#preferred(&block) ⇒ String
URL of the least burdened Dispatcher. If not a grid member it will return this Dispatcher’s URL.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 129 def preferred( &block ) if !@node.grid_member? block.call @url return end each = proc do |neighbour, iter| connect_to_peer( neighbour ).workload_score do |score| iter.return score.rpc_exception? ? nil : [neighbour, score] end end after = proc do |nodes| nodes.compact! nodes << [@url, workload_score] block.call nodes.sort_by { |_, score| score }[0][0] end ::EM::Iterator.new( @node.neighbours ).map( each, after ) end |
#proc_info ⇒ Hash
Returns the server’s proc info.
268 269 270 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 268 def proc_info proc_hash( Process.pid ).merge( 'node' => @node.info ) end |
#running_jobs ⇒ Array<Hash>
Returns proc info for all running jobs.
220 221 222 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 220 def running_jobs jobs.reject { |job| job['proc'].empty? } end |
#stats ⇒ Hash
Returns server stats regarding the jobs and pool.
247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 247 def stats stats_h = { 'running_jobs' => running_jobs, 'finished_jobs' => finished_jobs, 'init_pool_size' => @opts.pool_size, 'curr_pool_size' => @pool.size, 'consumed_pids' => @consumed_pids } stats_h.merge!( 'node' => @node.info, 'neighbours' => @node.neighbours ) stats_h['node']['score'] = workload_score stats_h end |
#workload_score ⇒ Float
Returns Workload score for this Dispatcher, calculated using the number of #running_jobs and the configured node weight.
Lower is better.
239 240 241 242 243 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 239 def workload_score score = (running_jobs.size + 1).to_f score *= @node.info['weight'].to_f if @node.info['weight'] score end |