Class: Arachni::RPC::Server::Dispatcher

Inherits:
Object
  • Object
show all
Includes:
UI::Output, Utilities, Sys
Defined in:
lib/arachni/rpc/server/dispatcher.rb

Overview

Dispatches RPC Instances on demand providing a centralized environment for multiple clients and allows for extensive process monitoring.

The process goes something like this:

  • On initialization the Dispatcher populates the Instance pool.

  • A client issues a #dispatch call.

  • The Dispatcher pops an Instance from the pool

    • Asynchronously replenishes the pool

    • Gives the Instance credentials to the client (url, auth token, etc.)

  • The client connects to the Instance using these credentials.

Once the client finishes using the RPC Instance he must shut it down otherwise the system will be eaten away by zombie RPC Instance processes.

Author:

Defined Under Namespace

Classes: Handler, Node

Constant Summary collapse

HANDLER_NAMESPACE =
Handler

Instance Method Summary collapse

Methods included from UI::Output

#debug?, #debug_off, #debug_on, #disable_only_positives, #error_logfile, #flush_buffer, #log_error, #mute, #muted?, old_reset_output_options, #only_positives, #only_positives?, #print_bad, #print_debug, #print_debug_backtrace, #print_debug_pp, #print_error, #print_error_backtrace, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?, reset_output_options, #set_buffer_cap, #set_error_logfile, #uncap_buffer, #unmute, #verbose, #verbose?

Methods included from Utilities

#available_port, #cookie_encode, #cookies_from_document, #cookies_from_file, #cookies_from_response, #exception_jail, #exclude_path?, #extract_domain, #follow_protocol?, #form_decode, #form_encode, #form_parse_request_body, #forms_from_document, #forms_from_response, #generate_token, #get_path, #html_decode, #html_encode, #include_path?, #links_from_document, #links_from_response, #normalize_url, #page_from_response, #page_from_url, #parse_query, #parse_set_cookie, #parse_url_vars, #path_in_domain?, #path_too_deep?, #port_available?, #rand_port, #redundant_path?, #remove_constants, #seed, #skip_page?, #skip_path?, #skip_resource?, #to_absolute, #uri_decode, #uri_encode, #uri_parse, #uri_parser, #url_sanitize

Constructor Details

#initialize(opts = Options.instance) ⇒ Dispatcher

Returns a new instance of Dispatcher.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/arachni/rpc/server/dispatcher.rb', line 58

def initialize( opts = Options.instance )
    banner

    @opts = opts

    @opts.rpc_port             ||= 7331
    @opts.rpc_address          ||= 'localhost'
    @opts.rpc_external_address ||= @opts.rpc_address
    @opts.pool_size            ||= 5

    if @opts.help
        print_help
        exit 0
    end

    @server = Base.new( @opts )
    @server.logger.level = @opts.datastore[:log_level] if @opts.datastore[:log_level]

    @server.add_async_check do |method|
        # methods that expect a block are async
        method.parameters.flatten.include? :block
    end

    @url = "#{@opts.rpc_external_address}:#{@opts.rpc_port.to_s}"

    # let the instances in the pool know who to ask for routing instructions
    # when we're in grid mode.
    @opts.datastore[:dispatcher_url] = @url.dup

    prep_logging

    print_status 'Initing RPC Server...'

    @server.add_handler( 'dispatcher', self )

    # trap interrupts and exit cleanly when required
    trap_interrupts { shutdown }

    @jobs          = []
    @consumed_pids = []
    @pool          = ::EM::Queue.new

    if @opts.pool_size > 0
        print_status 'Warming up the pool...'
        @opts.pool_size.times { add_instance_to_pool }
    end

    print_status 'Initialization complete.'

    @node = Node.new( @opts, @logfile )
    @server.add_handler( 'node', @node )

    _handlers.each do |name, handler|
        @server.add_handler( name, handler.new( @opts, self ) )
    end

    run
end

Instance Method Details

#alive?TrueClass

Returns true.

Returns:

  • (TrueClass)

    true



122
123
124
# File 'lib/arachni/rpc/server/dispatcher.rb', line 122

def alive?
    @server.alive?
end

#dispatch(owner = 'unknown', helpers = {}, load_balance = true, &block) ⇒ Hash

Dispatches an Instance from the pool.

Parameters:

  • owner (String) (defaults to: 'unknown')

    An owner to assign to the Instance.

  • helpers (Hash) (defaults to: {})

    Hash of helper data to be added to the job.

  • load_balance (Boolean) (defaults to: true)

    Return an Instance from the least burdened Dispatcher (when in Grid mode) or from this one directly?

Returns:

  • (Hash)

    Includes port number, owner, clock info and proc info.



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/arachni/rpc/server/dispatcher.rb', line 161

def dispatch( owner = 'unknown', helpers = {}, load_balance = true, &block )
    if load_balance && @node.grid_member?
        preferred do |url|
            connect_to_peer( url ).dispatch( owner, helpers, false, &block )
        end
        return
    end

    if @opts.pool_size <= 0
        block.call false
        return
    end

    @pool.pop do |cjob|
        cjob['owner']     = owner.to_s
        cjob['starttime'] = Time.now
        cjob['helpers']   = helpers

        print_status "Instance dispatched -- PID: #{cjob['pid']} - " +
            "Port: #{cjob['port']} - Owner: #{cjob['owner']}"

        @jobs << cjob
        block.call cjob
    end

    ::EM.next_tick { add_instance_to_pool }
end

#finished_jobsArray<Hash>

Returns proc info for all finished jobs.

Returns:

  • (Array<Hash>)

    Returns proc info for all finished jobs.

See Also:



229
230
231
# File 'lib/arachni/rpc/server/dispatcher.rb', line 229

def finished_jobs
    jobs.select { |job| job['proc'].empty? }
end

#handlersObject



117
118
119
# File 'lib/arachni/rpc/server/dispatcher.rb', line 117

def handlers
    _handlers.keys
end

#job(pid) ⇒ Hash

Returns proc info for a given pid

Parameters:

  • pid (Fixnum)

Returns:



196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/arachni/rpc/server/dispatcher.rb', line 196

def job( pid )
    @jobs.each do |j|
        next if j['pid'] != pid
        cjob = j.dup

        cjob['currtime'] = Time.now
        cjob['age']      = cjob['currtime'] - cjob['birthdate']
        cjob['runtime']  = cjob['currtime'] - cjob['starttime']
        cjob['proc']     = proc_hash( cjob['pid'] )

        return cjob
    end
end

#jobsArray<Hash>

Returns proc info for all jobs.

Returns:

  • (Array<Hash>)

    Returns proc info for all jobs.



211
212
213
# File 'lib/arachni/rpc/server/dispatcher.rb', line 211

def jobs
    @jobs.map { |cjob| job( cjob['pid'] ) }.compact
end

#logString

Returns contents of the log file.

Returns:

  • (String)

    contents of the log file



263
264
265
# File 'lib/arachni/rpc/server/dispatcher.rb', line 263

def log
    IO.read prep_logging
end

#preferred(&block) ⇒ String

URL of the least burdened Dispatcher. If not a grid member it will return this Dispatcher’s URL.

Returns:

  • (String)

    URL of the least burdened Dispatcher. If not a grid member it will return this Dispatcher’s URL.



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/arachni/rpc/server/dispatcher.rb', line 129

def preferred( &block )
    if !@node.grid_member?
        block.call @url
        return
    end

    each = proc do |neighbour, iter|
        connect_to_peer( neighbour ).workload_score do |score|
            iter.return score.rpc_exception? ? nil : [neighbour, score]
        end
    end

    after = proc do |nodes|
        nodes.compact!
        nodes << [@url, workload_score]
        block.call nodes.sort_by { |_, score| score }[0][0]
    end

    ::EM::Iterator.new( @node.neighbours ).map( each, after )
end

#proc_infoHash

Returns the server’s proc info.

Returns:

  • (Hash)

    the server’s proc info



268
269
270
# File 'lib/arachni/rpc/server/dispatcher.rb', line 268

def proc_info
    proc_hash( Process.pid ).merge( 'node' => @node.info )
end

#running_jobsArray<Hash>

Returns proc info for all running jobs.

Returns:

  • (Array<Hash>)

    Returns proc info for all running jobs.

See Also:



220
221
222
# File 'lib/arachni/rpc/server/dispatcher.rb', line 220

def running_jobs
    jobs.reject { |job| job['proc'].empty? }
end

#statsHash

Returns server stats regarding the jobs and pool.

Returns:

  • (Hash)

    Returns server stats regarding the jobs and pool.



247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/arachni/rpc/server/dispatcher.rb', line 247

def stats
    stats_h = {
        'running_jobs'   => running_jobs,
        'finished_jobs'  => finished_jobs,
        'init_pool_size' => @opts.pool_size,
        'curr_pool_size' => @pool.size,
        'consumed_pids'  => @consumed_pids
    }

    stats_h.merge!( 'node' => @node.info, 'neighbours' => @node.neighbours )
    stats_h['node']['score']  = workload_score

    stats_h
end

#workload_scoreFloat

Returns Workload score for this Dispatcher, calculated using the number of #running_jobs and the configured node weight.

Lower is better.

Returns:

  • (Float)

    Workload score for this Dispatcher, calculated using the number of #running_jobs and the configured node weight.

    Lower is better.



239
240
241
242
243
# File 'lib/arachni/rpc/server/dispatcher.rb', line 239

def workload_score
    score = (running_jobs.size + 1).to_f
    score *= @node.info['weight'].to_f if @node.info['weight']
    score
end