Class: Arachni::RPC::Server::Dispatcher

Inherits:
Object
  • Object
show all
Includes:
UI::Output, Utilities
Defined in:
lib/arachni/rpc/server/dispatcher.rb

Overview

Dispatches RPC Instances on demand providing a centralized environment for multiple clients and allows for extensive process monitoring.

The process goes something like this:

  • On initialization the Dispatcher populates the Instance pool.
  • A client issues a #dispatch call.
  • The Dispatcher pops an Instance from the pool
    • Asynchronously replenishes the pool
    • Gives the Instance credentials to the client (url, auth token, etc.)
  • The client connects to the Instance using these credentials.

Once the client finishes using the RPC Instance he must shut it down otherwise the system will be eaten away by zombie RPC Instance processes.

Author:

Defined Under Namespace

Classes: Node, Service

Constant Summary collapse

SERVICE_NAMESPACE =
Service

Instance Method Summary collapse

Methods included from UI::Output

#caller_location, #debug?, #debug_level, #debug_level_1?, #debug_level_2?, #debug_level_3?, #debug_level_4?, #debug_off, #debug_on, #disable_only_positives, #error_buffer, #error_log_fd, #error_logfile, #has_error_log?, #included, #log_error, #mute, #muted?, #only_positives, #only_positives?, #print_bad, #print_debug, #print_debug_backtrace, #print_debug_exception, #print_debug_level_1, #print_debug_level_2, #print_debug_level_3, #print_debug_level_4, #print_error, #print_error_backtrace, #print_exception, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?, reset_output_options, #set_error_logfile, #unmute, #verbose?, #verbose_off, #verbose_on

Methods included from Utilities

#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #cookie_decode, #cookie_encode, #cookies_from_file, #cookies_from_parser, #cookies_from_response, #exception_jail, #exclude_path?, #follow_protocol?, #form_decode, #form_encode, #forms_from_parser, #forms_from_response, #full_and_absolute_url?, #generate_token, #get_path, #hms_to_seconds, #html_decode, #html_encode, #include_path?, #links_from_parser, #links_from_response, #normalize_url, #page_from_response, #page_from_url, #parse_set_cookie, #path_in_domain?, #path_too_deep?, #port_available?, #rand_port, #random_seed, #redundant_path?, #regexp_array_match, #remove_constants, #request_parse_body, #seconds_to_hms, #skip_page?, #skip_path?, #skip_resource?, #skip_response?, #to_absolute, #uri_decode, #uri_encode, #uri_parse, #uri_parse_query, #uri_parser, #uri_rewrite

Constructor Details

#initialize(options = Options.instance) ⇒ Dispatcher

Returns a new instance of Dispatcher.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/arachni/rpc/server/dispatcher.rb', line 46

def initialize( options = Options.instance )
    @options = options

    @options.dispatcher.external_address ||= @options.rpc.server_address
    @options.snapshot.save_path          ||= @options.paths.snapshots

    @server = Base.new( @options )
    @server.logger.level = @options.datastore.log_level if @options.datastore.log_level

    @server.add_async_check do |method|
        # methods that expect a block are async
        method.parameters.flatten.include? :block
    end

    @url = "#{@options.dispatcher.external_address}:#{@options.rpc.server_port}"

    # let the instances in the pool know who to ask for routing instructions
    # when we're in grid mode.
    @options.datastore.dispatcher_url = @url

    prep_logging

    print_status 'Starting the RPC Server...'

    @server.add_handler( 'dispatcher', self )

    # trap interrupts and exit cleanly when required
    trap_interrupts { shutdown }

    @jobs          = []
    @consumed_pids = []
    @pool          = Reactor.global.create_queue

    print_status "Populating the pool with #{@options.dispatcher.pool_size}  Instances."
    if @options.dispatcher.pool_size > 0
        @options.dispatcher.pool_size.times { add_instance_to_pool( false ) }
    end

    print_status 'Waiting for Instances to come on-line.'

    # Check up on the pool and start the server once it has been filled.
    Reactor.global.at_interval( 0.1 ) do |task|
        print_debug "Instances: #{@pool.size}/#{@options.dispatcher.pool_size}"
        next if @options.dispatcher.pool_size != @pool.size
        task.done

        print_status 'Instances are on-line.'

        _services.each do |name, service|
            @server.add_handler( name, service.new( @options, self ) )
        end

        @node = Node.new( @options, @logfile )
        @server.add_handler( 'node', @node )

        run
    end
end

Instance Method Details

#alive?TrueClass

Returns true.

Returns:

  • (TrueClass)

    true



111
112
113
# File 'lib/arachni/rpc/server/dispatcher.rb', line 111

def alive?
    @server.alive?
end

#dispatch(owner = 'unknown', helpers = {}, load_balance = true, &block) ⇒ Hash, ...

Dispatches an Instance from the pool.

Parameters:

  • owner (String) (defaults to: 'unknown')

    An owner to assign to the Instance.

  • helpers (Hash) (defaults to: {})

    Hash of helper data to be added to the job.

  • load_balance (Boolean) (defaults to: true)

    Return an Instance from the least burdened Arachni::RPC::Server::Dispatcher (when in Grid mode) or from this one directly?

Returns:

  • (Hash, false, nil)

    Depending on availability:

    • Hash: Includes URL, owner, clock info and proc info.
    • false: Pool is currently empty, check back again in a few seconds.
    • nil: The Arachni::RPC::Server::Dispatcher was configured with a pool-size of 0.


155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/arachni/rpc/server/dispatcher.rb', line 155

def dispatch( owner = 'unknown', helpers = {}, load_balance = true, &block )
    if load_balance && @node.grid_member?
        preferred do |url|
            connect_to_peer( url ).dispatch( owner, helpers, false, &block )
        end
        return
    end

    if @options.dispatcher.pool_size <= 0
        block.call nil
        return
    end

    if @pool.empty?
        block.call false
    else
        @pool.pop do |cjob|
            cjob['owner']     = owner.to_s
            cjob['starttime'] = Time.now.to_s
            cjob['helpers']   = helpers

            print_status "Instance dispatched -- PID: #{cjob['pid']} - " +
                "Port: #{cjob['port']} - Owner: #{cjob['owner']}"

            @jobs << cjob
            block.call cjob
        end
    end

    Reactor.global.schedule { add_instance_to_pool }
end

#finished_jobsArray<Hash>

Returns info for all finished jobs.

Returns:

  • (Array<Hash>)

    Returns info for all finished jobs.

See Also:



226
227
228
# File 'lib/arachni/rpc/server/dispatcher.rb', line 226

def finished_jobs
    jobs.reject { |job| job['alive'] }
end

#job(pid) ⇒ Hash

Returns proc info for a given pid

Parameters:

  • pid (Fixnum)

Returns:



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/arachni/rpc/server/dispatcher.rb', line 192

def job( pid )
    @jobs.each do |j|
        next if j['pid'] != pid
        cjob = j.dup

        currtime = Time.now

        cjob['currtime'] = currtime.to_s
        cjob['age']      = currtime - Time.parse( cjob['birthdate'] )
        cjob['runtime']  = currtime - Time.parse( cjob['starttime'] )
        cjob['alive']    = Arachni::Processes::Manager.alive?( pid )

        return cjob
    end
end

#jobsArray<Hash>

Returns info for all jobs.

Returns:

  • (Array<Hash>)

    Returns info for all jobs.



210
211
212
# File 'lib/arachni/rpc/server/dispatcher.rb', line 210

def jobs
    @jobs.map { |cjob| job( cjob['pid'] ) }.compact
end

#logString

Returns Contents of the log file.

Returns:

  • (String)

    Contents of the log file



261
262
263
# File 'lib/arachni/rpc/server/dispatcher.rb', line 261

def log
    IO.read prep_logging
end

#preferred(&block) ⇒ String

URL of the least burdened Dispatcher. If not a grid member it will return this Dispatcher's URL.

Returns:

  • (String)

    URL of the least burdened Dispatcher. If not a grid member it will return this Dispatcher's URL.



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/arachni/rpc/server/dispatcher.rb', line 118

def preferred( &block )
    if !@node.grid_member?
        block.call @url
        return
    end

    each = proc do |neighbour, iter|
        connect_to_peer( neighbour ).workload_score do |score|
            iter.return (!score || score.rpc_exception?) ? nil : [neighbour, score]
        end
    end

    after = proc do |nodes|
        nodes.compact!
        nodes << [@url, workload_score]
        block.call nodes.sort_by { |_, score| score }[0][0]
    end

    Reactor.global.create_iterator( @node.neighbours ).map( each, after )
end

#running_jobsArray<Hash>

Returns info for all running jobs.

Returns:

  • (Array<Hash>)

    Returns info for all running jobs.

See Also:



218
219
220
# File 'lib/arachni/rpc/server/dispatcher.rb', line 218

def running_jobs
    jobs.select { |job| job['alive'] }
end

#servicesObject



105
106
107
# File 'lib/arachni/rpc/server/dispatcher.rb', line 105

def services
    _services.keys
end

#statisticsHash

Returns server stats regarding the jobs and pool.

Returns:

  • (Hash)

    Returns server stats regarding the jobs and pool.



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/arachni/rpc/server/dispatcher.rb', line 243

def statistics
    stats_h = {
        'running_jobs'   => running_jobs,
        'finished_jobs'  => finished_jobs,
        'init_pool_size' => @options.dispatcher.pool_size,
        'curr_pool_size' => @pool.size,
        'consumed_pids'  => @consumed_pids,
        'snapshots'      => Dir.glob( "#{@options.snapshot.save_path}*.afs" )
    }

    stats_h.merge!( 'node' => @node.info, 'neighbours' => @node.neighbours )
    stats_h['node']['score']  = workload_score

    stats_h
end

#workload_scoreFloat

Returns Workload score for this Dispatcher, calculated using the number of #running_jobs and the configured node weight.

Lower is better.

Returns:

  • (Float)

    Workload score for this Dispatcher, calculated using the number of #running_jobs and the configured node weight.

    Lower is better.



235
236
237
238
239
# File 'lib/arachni/rpc/server/dispatcher.rb', line 235

def workload_score
    score = (running_jobs.size + 1).to_f
    score *= @node.info['weight'].to_f if @node.info['weight']
    score
end