Class: Arachni::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/arachni/reactor.rb,
lib/arachni/reactor/queue.rb,
lib/arachni/reactor/tasks.rb,
lib/arachni/reactor/global.rb,
lib/arachni/reactor/version.rb,
lib/arachni/reactor/iterator.rb,
lib/arachni/reactor/connection.rb,
lib/arachni/reactor/tasks/base.rb,
lib/arachni/reactor/tasks/delayed.rb,
lib/arachni/reactor/tasks/one_off.rb,
lib/arachni/reactor/connection/tls.rb,
lib/arachni/reactor/tasks/periodic.rb,
lib/arachni/reactor/connection/error.rb,
lib/arachni/reactor/tasks/persistent.rb,
lib/arachni/reactor/connection/callbacks.rb,
lib/arachni/reactor/connection/peer_info.rb

Overview

Reactor scheduler and and resource factory.

You're probably interested in:

Author:

Defined Under Namespace

Classes: Connection, Error, Iterator, Queue, Tasks

Constant Summary collapse

DEFAULT_OPTIONS =
{
    select_timeout:    0.02,
    max_tick_interval: 0.02
}
VERSION =
'0.1.3.1'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Reactor

Returns a new instance of Reactor.

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :max_tick_interval (Integer, nil) — default: 0.02

    How long to wait for each tick when no connections are available for processing.

  • :select_timeout (Integer) — default: 0.02

    How long to wait for connection activity before continuing to the next tick.



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/arachni/reactor.rb', line 130

def initialize( options = {} )
    options = DEFAULT_OPTIONS.merge( options )

    @max_tick_interval = options[:max_tick_interval]
    @select_timeout    = options[:select_timeout]

    # Socket => Connection
    @connections = {}
    @stop        = false
    @ticks       = 0
    @thread      = nil
    @tasks       = Tasks.new

    @error_handlers = Tasks.new
    @shutdown_tasks = Tasks.new
    @done_signal    = ::Queue.new
end

Instance Attribute Details

#connectionsArray<Connection> (readonly)

Returns Attached connections.

Returns:



76
77
78
# File 'lib/arachni/reactor.rb', line 76

def connections
  @connections
end

#max_tick_intervalInteger?

Returns Amount of time to wait for a connection.

Returns:

  • (Integer, nil)

    Amount of time to wait for a connection.



72
73
74
# File 'lib/arachni/reactor.rb', line 72

def max_tick_interval
  @max_tick_interval
end

#ticksInteger (readonly)

Returns Amount of ticks.

Returns:

  • (Integer)

    Amount of ticks.



80
81
82
# File 'lib/arachni/reactor.rb', line 80

def ticks
  @ticks
end

Class Method Details

.globalReactor

Returns Lazy-loaded, globally accessible Reactor.

Returns:

  • (Reactor)

    Lazy-loaded, globally accessible Reactor.



91
92
93
# File 'lib/arachni/reactor.rb', line 91

def global
    @reactor ||= Global.instance
end

.jruby?Boolean

Returns:

  • (Boolean)


118
119
120
# File 'lib/arachni/reactor.rb', line 118

def jruby?
    RUBY_PLATFORM == 'java'
end

.stopObject

Stops the global Reactor instance and destroys it. The next call to global will return a new instance.



97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/arachni/reactor.rb', line 97

def stop
    return if !@reactor

    global.stop rescue Error::NotRunning

    # Admittedly not the cleanest solution, but that's the only way to
    # force a Singleton to re-initialize -- and we want the Singleton to
    # cleanly implement the pattern in a Thread-safe way.
    global.class.instance_variable_set(:@singleton__instance__, nil)

    @reactor = nil
end

.supports_unix_sockets?Boolean

Returns:

  • (Boolean)


110
111
112
113
114
115
116
# File 'lib/arachni/reactor.rb', line 110

def supports_unix_sockets?
    return false if jruby?

    !!UNIXSocket
rescue NameError
    false
end

Instance Method Details

#at_interval(interval, &block) ⇒ Object

Note:

Time accuracy cannot be guaranteed.

Parameters:

  • interval (Float)

    Time in seconds.

  • block (Block)

    Schedules a task to be run at every interval seconds.

Raises:



454
455
456
457
458
# File 'lib/arachni/reactor.rb', line 454

def at_interval( interval, &block )
    fail_if_not_running
    @tasks << Tasks::Periodic.new( interval, &block )
    nil
end

#attach(connection) ⇒ Object

Parameters:

Raises:



497
498
499
500
501
502
503
504
505
# File 'lib/arachni/reactor.rb', line 497

def attach( connection )
    return if attached? connection

    schedule do
        connection.reactor = self
        @connections[connection.to_io] = connection
        connection.on_attach
    end
end

#attached?(connection) ⇒ Bool

Returns true if the connection is attached, false otherwise.

Returns:

  • (Bool)

    true if the connection is attached, false otherwise.



526
527
528
# File 'lib/arachni/reactor.rb', line 526

def attached?( connection )
    @connections.include? connection.to_io
end

#connect(host, port, handler = Connection, *handler_options) ⇒ Connection #connect(unix_socket, handler = Connection, *handler_options) ⇒ Connection

Note:

Connection errors will be passed to the handler's Arachni::Reactor::Connection::Callbacks#on_close method as a reason argument.

Connects to a peer.

Overloads:

  • #connect(host, port, handler = Connection, *handler_options) ⇒ Connection

    Parameters:

    • host (String)
    • port (Integer)
    • handler (Connection) (defaults to: Connection)

      Connection handler, should be a subclass of Connection.

    • handler_options (Hash)

      Options to pass to the #initialize method of the handler.

  • #connect(unix_socket, handler = Connection, *handler_options) ⇒ Connection

    Parameters:

    • unix_socket (String)

      Path to the UNIX socket to connect.

    • handler (Connection) (defaults to: Connection)

      Connection handler, should be a subclass of Connection.

    • handler_options (Hash)

      Options to pass to the #initialize method of the handler.

Returns:

Raises:



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/arachni/reactor.rb', line 190

def connect( *args, &block )
    fail_if_not_running

    options = determine_connection_options( *args )

    connection = options[:handler].new( *options[:handler_options] )
    connection.reactor = self
    block.call connection if block_given?

    begin
        Connection::Error.translate do
            socket = options[:unix_socket] ?
                connect_unix( options[:unix_socket] ) : connect_tcp

            connection.configure options.merge( socket: socket, role: :client )
            attach connection
        end
    rescue Connection::Error => e
        connection.close e
    end

    connection
end

#create_iterator(list, concurrency = 1) ⇒ Reactor::Iterator

Returns New Iterator with self as the scheduler.

Parameters:

  • list (#to_a)

    List to iterate.

  • concurrency (Integer) (defaults to: 1)

    Parallel workers to spawn.

Returns:



154
155
156
# File 'lib/arachni/reactor.rb', line 154

def create_iterator( list, concurrency = 1 )
    Reactor::Iterator.new( self, list, concurrency )
end

#create_queueReactor::Queue

Returns New Queue with self as the scheduler.

Returns:



160
161
162
# File 'lib/arachni/reactor.rb', line 160

def create_queue
    Reactor::Queue.new self
end

#delay(time, &block) ⇒ Object

Note:

Time accuracy cannot be guaranteed.

Parameters:

  • time (Float)

    Time in seconds.

  • block (Block)

    Schedules a task to be run in time seconds.

Raises:



468
469
470
471
472
# File 'lib/arachni/reactor.rb', line 468

def delay( time, &block )
    fail_if_not_running
    @tasks << Tasks::Delayed.new( time, &block )
    nil
end

#detach(connection) ⇒ Object

Detaches a connection from the Arachni::Reactor loop.

Parameters:

Raises:



514
515
516
517
518
519
520
521
522
# File 'lib/arachni/reactor.rb', line 514

def detach( connection )
    return if !attached?( connection )

    schedule do
        connection.on_detach
        @connections.delete connection.to_io
        connection.reactor = nil
    end
end

#in_same_thread?Bool

Returns true if the caller is in the same #thread as the reactor loop, false otherwise.

Returns:

Raises:



485
486
487
488
# File 'lib/arachni/reactor.rb', line 485

def in_same_thread?
    fail_if_not_running
    Thread.current == thread
end

#listen(host, port, handler = Connection, *handler_options) ⇒ Connection #listen(unix_socket, handler = Connection, *handler_options) ⇒ Connection

Note:

Connection errors will be passed to the handler's Arachni::Reactor::Connection::Callbacks#on_close method as a reason argument.

Listens for incoming connections.

Overloads:

  • #listen(host, port, handler = Connection, *handler_options) ⇒ Connection

    Parameters:

    • host (String)
    • port (Integer)
    • handler (Connection) (defaults to: Connection)

      Connection handler, should be a subclass of Connection.

    • handler_options (Hash)

      Options to pass to the #initialize method of the handler.

    Raises:

  • #listen(unix_socket, handler = Connection, *handler_options) ⇒ Connection

    Parameters:

    • unix_socket (String)

      Path to the UNIX socket to create.

    • handler (Connection) (defaults to: Connection)

      Connection handler, should be a subclass of Connection.

    • handler_options (Hash)

      Options to pass to the #initialize method of the handler.

    Raises:

Returns:

Raises:



248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/arachni/reactor.rb', line 248

def listen( *args, &block )
    fail_if_not_running

    options = determine_connection_options( *args )

    server_handler = proc do
        c = options[:handler].new( *options[:handler_options] )
        c.reactor = self
        block.call c if block_given?
        c
    end

    server = server_handler.call

    begin
        Connection::Error.translate do
            socket = options[:unix_socket] ?
                listen_unix( options[:unix_socket] ) :
                listen_tcp( options[:host], options[:port] )

            server.configure options.merge( socket: socket, role: :server, server_handler: server_handler )
            attach server
        end
    rescue Connection::Error => e
        server.close e
    end

    server
end

#next_tick(&block) ⇒ Object

Parameters:

  • block (Block)

    Schedules a task to be run at the next tick.

Raises:



440
441
442
443
444
# File 'lib/arachni/reactor.rb', line 440

def next_tick( &block )
    fail_if_not_running
    @tasks << Tasks::OneOff.new( &block )
    nil
end

#on_error(&block) ⇒ Object

Parameters:

  • block (Block)

    Passes exceptions raised in the Reactor #thread to a task.

Raises:



392
393
394
395
396
# File 'lib/arachni/reactor.rb', line 392

def on_error( &block )
    fail_if_not_running
    @error_handlers << Tasks::Persistent.new( &block )
    nil
end

#on_shutdown(&block) ⇒ Object

Parameters:

Raises:



430
431
432
433
434
# File 'lib/arachni/reactor.rb', line 430

def on_shutdown( &block )
    fail_if_not_running
    @shutdown_tasks << Tasks::OneOff.new( &block )
    nil
end

#on_tick(&block) ⇒ Object

Parameters:

  • block (Block)

    Schedules a task to be run at each tick.

Raises:



402
403
404
405
406
# File 'lib/arachni/reactor.rb', line 402

def on_tick( &block )
    fail_if_not_running
    @tasks << Tasks::Persistent.new( &block )
    nil
end

#run(&block) ⇒ Object

Starts the Arachni::Reactor loop and blocks the current #thread until #stop is called.

Parameters:

  • block (Block)

    Block to call right before initializing the loop.

Raises:



298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/arachni/reactor.rb', line 298

def run( &block )
    fail_if_running

    @done_signal.clear

    @thread = Thread.current

    block.call if block_given?

    loop do
        begin
            @tasks.call
        rescue => e
            @error_handlers.call( e )
        end
        break if @stop

        begin
            process_connections
        rescue => e
            @error_handlers.call( e )
        end
        break if @stop

        @ticks += 1
    end

    @tasks.clear
    close_connections

    @shutdown_tasks.call

    @ticks  = 0
    @thread = nil

    @done_signal << nil
end

#run_block(&block) ⇒ Object

Starts the Reactor loop, blocks the current #thread while the given block executes and then #stops it.

Parameters:

  • block (Block)

    Block to call.

Raises:



377
378
379
380
381
382
383
384
385
# File 'lib/arachni/reactor.rb', line 377

def run_block( &block )
    fail ArgumentError, 'Missing block.' if !block_given?
    fail_if_running

    run do
        block.call
        next_tick { stop }
    end
end

#run_in_thread(&block) ⇒ Thread

Runs the Reactor in a thread and blocks until it is #running?.

Parameters:

  • block (Block)

    Block to call right before initializing the loop.

Returns:

Raises:



344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# File 'lib/arachni/reactor.rb', line 344

def run_in_thread( &block )
    fail_if_running

    Thread.new do
        begin
            run(&block)
        rescue => e
            @error_handlers.call( e )
        end
    end

    sleep 0.1 while !running?

    thread
end

#running?Bool

Returns true if the Arachni::Reactor is running, false otherwise.

Returns:



280
281
282
# File 'lib/arachni/reactor.rb', line 280

def running?
    thread && thread.alive?
end

#schedule(&block) ⇒ Object

Parameters:

  • block (Block)

    Schedules a task to be run as soon as possible, either immediately if the caller is in the same thread, or at the #next_tick otherwise.

Raises:



414
415
416
417
418
419
420
421
422
423
424
# File 'lib/arachni/reactor.rb', line 414

def schedule( &block )
    fail_if_not_running

    if in_same_thread?
        block.call
    else
        next_tick(&block)
    end

    nil
end

#stopObject

Raises:



287
288
289
# File 'lib/arachni/reactor.rb', line 287

def stop
    schedule { @stop = true }
end

#threadThread?

Returns Thread of the loop, nil if not running.

Returns:

  • (Thread, nil)

    Thread of the loop, nil if not running.



476
477
478
# File 'lib/arachni/reactor.rb', line 476

def thread
    @thread
end

#waitObject

Waits for the Reactor to stop #running?.

Raises:



363
364
365
366
367
368
# File 'lib/arachni/reactor.rb', line 363

def wait
    fail_if_not_running

    @done_signal.pop
    true
end