Class: Arachni::Reactor
- Inherits:
-
Object
- Object
- Arachni::Reactor
- Defined in:
- lib/arachni/reactor.rb,
lib/arachni/reactor/queue.rb,
lib/arachni/reactor/tasks.rb,
lib/arachni/reactor/global.rb,
lib/arachni/reactor/version.rb,
lib/arachni/reactor/iterator.rb,
lib/arachni/reactor/connection.rb,
lib/arachni/reactor/tasks/base.rb,
lib/arachni/reactor/tasks/delayed.rb,
lib/arachni/reactor/tasks/one_off.rb,
lib/arachni/reactor/connection/tls.rb,
lib/arachni/reactor/tasks/periodic.rb,
lib/arachni/reactor/connection/error.rb,
lib/arachni/reactor/tasks/persistent.rb,
lib/arachni/reactor/connection/callbacks.rb,
lib/arachni/reactor/connection/peer_info.rb
Overview
Reactor scheduler and and resource factory.
You're probably interested in:
- Getting access to a shared and globally accessible Reactor --
that's probably what you want.
- Rest of the class methods can be used to manage it.
- Creating resources like:
Defined Under Namespace
Classes: Connection, Error, Iterator, Queue, Tasks
Constant Summary collapse
- DEFAULT_OPTIONS =
{ select_timeout: 0.02, max_tick_interval: 0.02 }
- VERSION =
'0.1.3.1'
Instance Attribute Summary collapse
-
#connections ⇒ Array<Connection>
readonly
Attached connections.
-
#max_tick_interval ⇒ Integer?
Amount of time to wait for a connection.
-
#ticks ⇒ Integer
readonly
Amount of ticks.
Class Method Summary collapse
-
.global ⇒ Reactor
Lazy-loaded, globally accessible Reactor.
- .jruby? ⇒ Boolean
-
.stop ⇒ Object
Stops the global Reactor instance and destroys it.
- .supports_unix_sockets? ⇒ Boolean
Instance Method Summary collapse
- #at_interval(interval, &block) ⇒ Object
- #attach(connection) ⇒ Object
-
#attached?(connection) ⇒ Bool
true
if the connection is attached,false
otherwise. -
#connect(*args, &block) ⇒ Connection
Connects to a peer.
-
#create_iterator(list, concurrency = 1) ⇒ Reactor::Iterator
New Iterator with
self
as the scheduler. -
#create_queue ⇒ Reactor::Queue
New Queue with
self
as the scheduler. - #delay(time, &block) ⇒ Object
- #detach(connection) ⇒ Object
-
#in_same_thread? ⇒ Bool
true
if the caller is in the same #thread as the reactor loop,false
otherwise. -
#initialize(options = {}) ⇒ Reactor
constructor
A new instance of Reactor.
-
#listen(*args, &block) ⇒ Connection
Listens for incoming connections.
- #next_tick(&block) ⇒ Object
- #on_error(&block) ⇒ Object
- #on_shutdown(&block) ⇒ Object
- #on_tick(&block) ⇒ Object
- #run(&block) ⇒ Object
-
#run_block(&block) ⇒ Object
Starts the Reactor loop, blocks the current #thread while the given
block
executes and then #stops it. - #run_in_thread(&block) ⇒ Thread
- #running? ⇒ Bool
- #schedule(&block) ⇒ Object
-
#stop ⇒ Object
Stops the Reactor loop as soon as possible.
-
#thread ⇒ Thread?
Thread of the loop,
nil
if not running. -
#wait ⇒ Object
Waits for the Reactor to stop #running?.
Constructor Details
#initialize(options = {}) ⇒ Reactor
Returns a new instance of Reactor.
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/arachni/reactor.rb', line 130 def initialize( = {} ) = DEFAULT_OPTIONS.merge( ) @max_tick_interval = [:max_tick_interval] @select_timeout = [:select_timeout] # Socket => Connection @connections = {} @stop = false @ticks = 0 @thread = nil @tasks = Tasks.new @error_handlers = Tasks.new @shutdown_tasks = Tasks.new @done_signal = ::Queue.new end |
Instance Attribute Details
#connections ⇒ Array<Connection> (readonly)
Returns Attached connections.
76 77 78 |
# File 'lib/arachni/reactor.rb', line 76 def connections @connections end |
#max_tick_interval ⇒ Integer?
Returns Amount of time to wait for a connection.
72 73 74 |
# File 'lib/arachni/reactor.rb', line 72 def max_tick_interval @max_tick_interval end |
#ticks ⇒ Integer (readonly)
Returns Amount of ticks.
80 81 82 |
# File 'lib/arachni/reactor.rb', line 80 def ticks @ticks end |
Class Method Details
.global ⇒ Reactor
Returns Lazy-loaded, globally accessible Reactor.
91 92 93 |
# File 'lib/arachni/reactor.rb', line 91 def global @reactor ||= Global.instance end |
.jruby? ⇒ Boolean
118 119 120 |
# File 'lib/arachni/reactor.rb', line 118 def jruby? RUBY_PLATFORM == 'java' end |
.stop ⇒ Object
Stops the global Reactor instance and destroys it. The next call to global will return a new instance.
97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/arachni/reactor.rb', line 97 def stop return if !@reactor global.stop rescue Error::NotRunning # Admittedly not the cleanest solution, but that's the only way to # force a Singleton to re-initialize -- and we want the Singleton to # cleanly implement the pattern in a Thread-safe way. global.class.instance_variable_set(:@singleton__instance__, nil) @reactor = nil end |
.supports_unix_sockets? ⇒ Boolean
110 111 112 113 114 115 116 |
# File 'lib/arachni/reactor.rb', line 110 def supports_unix_sockets? return false if jruby? !!UNIXSocket rescue NameError false end |
Instance Method Details
#at_interval(interval, &block) ⇒ Object
Time accuracy cannot be guaranteed.
454 455 456 457 458 |
# File 'lib/arachni/reactor.rb', line 454 def at_interval( interval, &block ) fail_if_not_running @tasks << Tasks::Periodic.new( interval, &block ) nil end |
#attach(connection) ⇒ Object
Attaches a connection to the Arachni::Reactor loop.
497 498 499 500 501 502 503 504 505 |
# File 'lib/arachni/reactor.rb', line 497 def attach( connection ) return if attached? connection schedule do connection.reactor = self @connections[connection.to_io] = connection connection.on_attach end end |
#attached?(connection) ⇒ Bool
Returns true
if the connection is attached, false
otherwise.
526 527 528 |
# File 'lib/arachni/reactor.rb', line 526 def attached?( connection ) @connections.include? connection.to_io end |
#connect(host, port, handler = Connection, *handler_options) ⇒ Connection #connect(unix_socket, handler = Connection, *handler_options) ⇒ Connection
Connection errors will be passed to the handler
's
Arachni::Reactor::Connection::Callbacks#on_close method as a reason
argument.
Connects to a peer.
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/arachni/reactor.rb', line 190 def connect( *args, &block ) fail_if_not_running = ( *args ) connection = [:handler].new( *[:handler_options] ) connection.reactor = self block.call connection if block_given? begin Connection::Error.translate do socket = [:unix_socket] ? connect_unix( [:unix_socket] ) : connect_tcp connection.configure .merge( socket: socket, role: :client ) attach connection end rescue Connection::Error => e connection.close e end connection end |
#create_iterator(list, concurrency = 1) ⇒ Reactor::Iterator
Returns New Iterator with self
as the scheduler.
154 155 156 |
# File 'lib/arachni/reactor.rb', line 154 def create_iterator( list, concurrency = 1 ) Reactor::Iterator.new( self, list, concurrency ) end |
#create_queue ⇒ Reactor::Queue
Returns New Queue with self
as the scheduler.
160 161 162 |
# File 'lib/arachni/reactor.rb', line 160 def create_queue Reactor::Queue.new self end |
#delay(time, &block) ⇒ Object
Time accuracy cannot be guaranteed.
468 469 470 471 472 |
# File 'lib/arachni/reactor.rb', line 468 def delay( time, &block ) fail_if_not_running @tasks << Tasks::Delayed.new( time, &block ) nil end |
#detach(connection) ⇒ Object
Detaches a connection from the Arachni::Reactor loop.
514 515 516 517 518 519 520 521 522 |
# File 'lib/arachni/reactor.rb', line 514 def detach( connection ) return if !attached?( connection ) schedule do connection.on_detach @connections.delete connection.to_io connection.reactor = nil end end |
#in_same_thread? ⇒ Bool
Returns true
if the caller is in the same #thread as the reactor loop,
false
otherwise.
485 486 487 488 |
# File 'lib/arachni/reactor.rb', line 485 def in_same_thread? fail_if_not_running Thread.current == thread end |
#listen(host, port, handler = Connection, *handler_options) ⇒ Connection #listen(unix_socket, handler = Connection, *handler_options) ⇒ Connection
Connection errors will be passed to the handler
's
Arachni::Reactor::Connection::Callbacks#on_close method as a reason
argument.
Listens for incoming connections.
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/arachni/reactor.rb', line 248 def listen( *args, &block ) fail_if_not_running = ( *args ) server_handler = proc do c = [:handler].new( *[:handler_options] ) c.reactor = self block.call c if block_given? c end server = server_handler.call begin Connection::Error.translate do socket = [:unix_socket] ? listen_unix( [:unix_socket] ) : listen_tcp( [:host], [:port] ) server.configure .merge( socket: socket, role: :server, server_handler: server_handler ) attach server end rescue Connection::Error => e server.close e end server end |
#next_tick(&block) ⇒ Object
440 441 442 443 444 |
# File 'lib/arachni/reactor.rb', line 440 def next_tick( &block ) fail_if_not_running @tasks << Tasks::OneOff.new( &block ) nil end |
#on_error(&block) ⇒ Object
392 393 394 395 396 |
# File 'lib/arachni/reactor.rb', line 392 def on_error( &block ) fail_if_not_running @error_handlers << Tasks::Persistent.new( &block ) nil end |
#on_shutdown(&block) ⇒ Object
430 431 432 433 434 |
# File 'lib/arachni/reactor.rb', line 430 def on_shutdown( &block ) fail_if_not_running @shutdown_tasks << Tasks::OneOff.new( &block ) nil end |
#on_tick(&block) ⇒ Object
402 403 404 405 406 |
# File 'lib/arachni/reactor.rb', line 402 def on_tick( &block ) fail_if_not_running @tasks << Tasks::Persistent.new( &block ) nil end |
#run(&block) ⇒ Object
Starts the Arachni::Reactor loop and blocks the current #thread until #stop is called.
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/arachni/reactor.rb', line 298 def run( &block ) fail_if_running @done_signal.clear @thread = Thread.current block.call if block_given? loop do begin @tasks.call rescue => e @error_handlers.call( e ) end break if @stop begin process_connections rescue => e @error_handlers.call( e ) end break if @stop @ticks += 1 end @tasks.clear close_connections @shutdown_tasks.call @ticks = 0 @thread = nil @done_signal << nil end |
#run_block(&block) ⇒ Object
Starts the Reactor loop, blocks the current #thread while the
given block
executes and then #stops it.
377 378 379 380 381 382 383 384 385 |
# File 'lib/arachni/reactor.rb', line 377 def run_block( &block ) fail ArgumentError, 'Missing block.' if !block_given? fail_if_running run do block.call next_tick { stop } end end |
#run_in_thread(&block) ⇒ Thread
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 |
# File 'lib/arachni/reactor.rb', line 344 def run_in_thread( &block ) fail_if_running Thread.new do begin run(&block) rescue => e @error_handlers.call( e ) end end sleep 0.1 while !running? thread end |
#running? ⇒ Bool
Returns true
if the Arachni::Reactor is running, false
otherwise.
280 281 282 |
# File 'lib/arachni/reactor.rb', line 280 def running? thread && thread.alive? end |
#schedule(&block) ⇒ Object
414 415 416 417 418 419 420 421 422 423 424 |
# File 'lib/arachni/reactor.rb', line 414 def schedule( &block ) fail_if_not_running if in_same_thread? block.call else next_tick(&block) end nil end |
#stop ⇒ Object
Stops the Arachni::Reactor loop as soon as possible.
287 288 289 |
# File 'lib/arachni/reactor.rb', line 287 def stop schedule { @stop = true } end |
#thread ⇒ Thread?
Returns Thread of the loop, nil
if not running.
476 477 478 |
# File 'lib/arachni/reactor.rb', line 476 def thread @thread end |
#wait ⇒ Object
Waits for the Reactor to stop #running?.
363 364 365 366 367 368 |
# File 'lib/arachni/reactor.rb', line 363 def wait fail_if_not_running @done_signal.pop true end |