Class: Ione::Io::IoReactor
- Inherits:
-
Object
- Object
- Ione::Io::IoReactor
- Defined in:
- lib/ione/io/io_reactor.rb
Overview
An IO reactor takes care of all the IO for a client. It handles opening new connections, and making sure that connections that have data to send flush to the network, and connections that have data coming in read that data and delegate it to their protocol handlers.
All IO is done in a single background thread, regardless of how many connections you open. There shouldn't be any problems handling hundreds of connections if needed. All operations are thread safe, but you should take great care when in your protocol handlers to make sure that they don't do too much work in their data handling callbacks, since those will be run in the reactor thread, and every cycle you use there is a cycle which can't be used to handle IO.
The IO reactor is completely protocol agnostic, and it's up to you to create objects that can interpret the bytes received from remote hosts, and to send the correct commands back. The way this works is that when you open a connection you can provide a protocol handler factory as a block, (or you can simply wrap the returned connection). This factory can be used to create objects that wrap the raw connections and register to receive new data, and it can write data to connection. It can also register to be notified when the socket is closed, or it can itself close the socket.
Constant Summary collapse
- PENDING_STATE =
0
- RUNNING_STATE =
1
- CRASHED_STATE =
2
- STOPPING_STATE =
3
- STOPPED_STATE =
4
Instance Method Summary collapse
-
#bind(host, port, options = nil) {|the| ... } ⇒ Ione::Future
Starts a server bound to the specified host and port.
-
#cancel_timer(timer_future) ⇒ Object
Cancels a previously scheduled timer.
-
#connect(host, port, options = nil) {|connection| ... } ⇒ Ione::Future
Opens a connection to the specified host and port.
-
#initialize(options = {}) ⇒ IoReactor
constructor
Initializes a new IO reactor.
-
#on_error {|error| ... } ⇒ Object
Register to receive notifications when the reactor shuts down because of an irrecoverable error.
-
#running? ⇒ Boolean
Returns true as long as the reactor is running.
-
#schedule_timer(timeout) ⇒ Ione::Future
Returns a future that completes after the specified number of seconds.
-
#start ⇒ Ione::Future
Starts the reactor.
-
#stop ⇒ Ione::Future
Stops the reactor.
- #to_s ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ IoReactor
Initializes a new IO reactor.
93 94 95 96 97 98 99 100 101 102 |
# File 'lib/ione/io/io_reactor.rb', line 93 def initialize(={}) @options = @clock = [:clock] || Time @state = PENDING_STATE @error_listeners = [] @unblocker = nil @io_loop = IoLoopBody.new(@options) @scheduler = Scheduler.new(@options) @lock = Mutex.new end |
Instance Method Details
#bind(host, port, options = nil) {|the| ... } ⇒ Ione::Future
Starts a server bound to the specified host and port.
A server is represented by an Acceptor, which wraps the server socket and accepts client connections. By registering to be notified on new connections, via Acceptor#on_accept, you can attach your server handling code to a connection.
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 |
# File 'lib/ione/io/io_reactor.rb', line 309 def bind(host, port, =nil, &block) if .is_a?(Integer) || .nil? backlog = || 5 ssl_context = nil elsif backlog = [:backlog] || 5 ssl_context = [:ssl] end if ssl_context server = SslAcceptor.new(host, port, backlog, @unblocker, self, ssl_context) else server = Acceptor.new(host, port, backlog, @unblocker, self) end f = server.bind @io_loop.add_socket(server) @unblocker.unblock if running? f = f.map(&block) if block_given? f end |
#cancel_timer(timer_future) ⇒ Object
Cancels a previously scheduled timer.
The timer will fail with a CancelledError.
350 351 352 |
# File 'lib/ione/io/io_reactor.rb', line 350 def cancel_timer(timer_future) @scheduler.cancel_timer(timer_future) end |
#connect(host, port, options = nil) {|connection| ... } ⇒ Ione::Future
Opens a connection to the specified host and port.
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/ione/io/io_reactor.rb', line 228 def connect(host, port, =nil, &block) if .is_a?(Numeric) || .nil? timeout = || 5 ssl = false elsif timeout = [:timeout] || 5 ssl = [:ssl] end connection = Connection.new(host, port, timeout, @unblocker, @clock) f = connection.connect @io_loop.add_socket(connection) @unblocker.unblock if running? if ssl f = f.flat_map do ssl_context = ssl == true ? nil : ssl upgraded_connection = SslConnection.new(host, port, connection.to_io, @unblocker, ssl_context) ff = upgraded_connection.connect @io_loop.remove_socket(connection) @io_loop.add_socket(upgraded_connection) @unblocker.unblock ff end end f = f.map(&block) if block_given? f end |
#on_error {|error| ... } ⇒ Object
Register to receive notifications when the reactor shuts down because of an irrecoverable error.
The listener block will be called in the reactor thread. Any errors that it raises will be ignored.
111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/ione/io/io_reactor.rb', line 111 def on_error(&listener) @lock.lock begin @error_listeners = @error_listeners.dup @error_listeners << listener ensure @lock.unlock end if @state == RUNNING_STATE || @state == CRASHED_STATE @stopped_promise.future.on_failure(&listener) end end |
#running? ⇒ Boolean
127 128 129 |
# File 'lib/ione/io/io_reactor.rb', line 127 def running? (state = @state) == RUNNING_STATE || state == STOPPING_STATE end |
#schedule_timer(timeout) ⇒ Ione::Future
Returns a future that completes after the specified number of seconds.
340 341 342 |
# File 'lib/ione/io/io_reactor.rb', line 340 def schedule_timer(timeout) @scheduler.schedule_timer(timeout) end |
#start ⇒ Ione::Future
Starts the reactor. This will spawn a background thread that will manage all connections.
This method is asynchronous and returns a future which completes when the reactor has started.
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/ione/io/io_reactor.rb', line 138 def start @lock.synchronize do if @state == RUNNING_STATE return @started_promise.future elsif @state == STOPPING_STATE return @stopped_promise.future.flat_map { start }.fallback { start } else @state = RUNNING_STATE end end @unblocker = Unblocker.new @io_loop.add_socket(@unblocker) @started_promise = Promise.new @stopped_promise = Promise.new @error_listeners.each do |listener| @stopped_promise.future.on_failure(&listener) end Thread.start do @started_promise.fulfill(self) error = nil begin while @state == RUNNING_STATE @io_loop.tick @scheduler.tick end rescue => e error = e ensure begin @io_loop.close_sockets @scheduler.cancel_timers @unblocker = nil ensure if error @state = CRASHED_STATE @stopped_promise.fail(error) else @state = STOPPED_STATE @stopped_promise.fulfill(self) end end end end @started_promise.future end |
#stop ⇒ Ione::Future
Stops the reactor.
This method is asynchronous and returns a future which completes when the reactor has completely stopped, or fails with an error if the reactor stops or has already stopped because of a failure.
191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/ione/io/io_reactor.rb', line 191 def stop @lock.synchronize do if @state == PENDING_STATE Future.resolved(self) elsif @state != STOPPED_STATE && @state != CRASHED_STATE @state = STOPPING_STATE @stopped_promise.future else @stopped_promise.future end end end |
#to_s ⇒ Object
354 355 356 357 358 359 360 |
# File 'lib/ione/io/io_reactor.rb', line 354 def to_s state_constant_name = self.class.constants.find do |name| name.to_s.end_with?('_STATE') && self.class.const_get(name) == @state end state = state_constant_name.to_s.rpartition('_').first %(#<#{self.class.name} #{state}>) end |