Class: ScripTTY::Net::EventLoop
- Inherits:
-
Object
- Object
- ScripTTY::Net::EventLoop
- Defined in:
- lib/scriptty/net/event_loop.rb
Defined Under Namespace
Classes: ConnectionWrapper, ListeningSocketWrapper, OutgoingConnectionWrapper, SocketChannelWrapper, Timer
Constant Summary collapse
- DEBUG =
false
Instance Method Summary collapse
-
#connect(address) ⇒ Object
Initiate a TCP connection to the specified address (given as [host, port]).
-
#done? ⇒ Boolean
Return true if the event loop is done executing.
-
#exit ⇒ Object
Instruct the main loop to exit.
-
#initialize ⇒ EventLoop
constructor
A new instance of EventLoop.
-
#listen(address, options = {}, &block) ⇒ Object
Listen for TCP connections on the specified address (given as [host, port]).
- #main ⇒ Object
-
#on_accept(address, options = {}, &callback) ⇒ Object
Convenience method: Listen for TCP connections on a particular address (given as [host, port]), and invoke the given block when a connection is received.
-
#on_connect(address, &callback) ⇒ Object
Convenience method: Initiate a TCP connection to the specified address (given as [host, port]) and invoke the given block when a connection is made.
-
#resume ⇒ Object
Resume a.
-
#suspend ⇒ Object
Temporarily break out of the event loop.
-
#timer(delay, options = {}, &callback) ⇒ Object
Invoke the specified callback after the specified number of seconds have elapsed.
Constructor Details
#initialize ⇒ EventLoop
Returns a new instance of EventLoop.
44 45 46 47 48 49 50 51 |
# File 'lib/scriptty/net/event_loop.rb', line 44 def initialize @selector = Selector.open @read_buffer = ByteBuffer.allocate(4096) @exit_mutex = Mutex.new # protects @exit_requested = false @timer_queue = [] # sorted list of timers, in ascending order of expire_at time @done = false end |
Instance Method Details
#connect(address) ⇒ Object
Initiate a TCP connection to the specified address (given as [host, port])
If a block is given, it will be passed the OutgointConnectionWrapper object, and the result of the block will be returned. Otherwise, the OutgoingConnectionWrapper object is returned.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/scriptty/net/event_loop.rb', line 129 def connect(address) connect_address = EventLoop.parse_address(address) chan = SocketChannel.open chan.configureBlocking(false) chan.socket.setOOBInline(true) # Receive TCP URGent data (but not the fact that it's urgent) in-band chan.connect(connect_address) cw = OutgoingConnectionWrapper.new(self, chan, address) # We want OP_CONNECT here (and OP_READ after the connection is # established) so that we can tell when the connection is # established/dropped, even if the user does not specifiy any # on_connect or on_close/on_read_bytes callbacks. chan.register(@selector, SelectionKey::OP_CONNECT) selection_key = chan.keyFor(@selector) # SelectionKey object check_glassfish_issue3027(selection_key) selection_key.attach({:connection_wrapper => cw}) if block_given? yield cw else cw end end |
#done? ⇒ Boolean
Return true if the event loop is done executing.
64 65 66 |
# File 'lib/scriptty/net/event_loop.rb', line 64 def done? @done end |
#exit ⇒ Object
Instruct the main loop to exit. Returns immediately.
-
This method may safely be called from any thread.
-
This method may safely be invoked multiple times.
57 58 59 60 61 |
# File 'lib/scriptty/net/event_loop.rb', line 57 def exit @exit_mutex.synchronize { @exit_requested = true } @selector.wakeup nil end |
#listen(address, options = {}, &block) ⇒ Object
Listen for TCP connections on the specified address (given as [host, port])
If port 0 is specified, the operating system will choose a port.
If a block is given, it will be passed the ListeningSocketWrapper object, and the result of the block will be returned. Otherwise, the ListeningSocketWrapper object is returned.
Options:
- :multiple
-
If true, then the parameter is a list of addresses, the block will be invoked for each one, and the return value will be an array of ListeningSocketWrapper objects.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/scriptty/net/event_loop.rb', line 81 def listen(address, ={}, &block) if [:multiple] # address is actually a list of addresses = .dup .delete(:multiple) return address.map{ |addr| listen(addr, , &block) } end bind_address = EventLoop.parse_address(address) schan = ServerSocketChannel.open schan.configureBlocking(false) schan.socket.bind(bind_address) lw = ListeningSocketWrapper.new(self, schan) # We want OP_ACCEPT here (and later, OP_READ), so that we can tell # when the connection is established/dropped, even if the user does # not specifiy any on_accept or on_close/on_read_bytes callbacks. schan.register(@selector, SelectionKey::OP_ACCEPT) selection_key = schan.keyFor(@selector) # SelectionKey object check_glassfish_issue3027(selection_key) selection_key.attach({:listening_socket_wrapper => lw}) if block block.call(lw) else lw end end |
#main ⇒ Object
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/scriptty/net/event_loop.rb', line 178 def main raise ArgumentError.new("use the resume method when suspended") if @suspended raise ArgumentError.new("Already done") if @done loop do # Exit if the "exit" method has been invoked. break if @exit_mutex.synchronize{ @exit_requested } # Exit if there are no active connections and no non-daemon timers break if (@selector.keys.empty? and (@timer_queue.empty? or @timer_queue.map{|t| t.daemon?}.all?)) # If there are any timers, schedule a wake-up for when the first # timer expires. next_timer = @timer_queue.first if next_timer timeout_millis = (1000 * (next_timer.expire_at - Time.now)).to_i timeout_millis = nil if timeout_millis <= 0 else timeout_millis = 0 # sleep indefinitely end # select(), unless the timeout has already expired puts "SELECT: to=#{timeout_millis.inspect} kk=#{@selector.keys.to_a.map{|k|k.}.inspect} tt=#{@timer_queue.length}" if DEBUG if timeout_millis # Return when something happens, or after timeout (if timeout_millis is non-zero) @selector.select(timeout_millis) else # Non-blocking select @selector.selectNow end puts "DONE SELECT" if DEBUG # Invoke the callbacks for any expired timers now = Time.now until @timer_queue.empty? or now < @timer_queue.first.expire_at timer = @timer_queue.shift timer.send(:callback).call end timer = nil # Handle channels that are ready for I/O operations @selector.selectedKeys.to_a.each do |k| handle_selection_key(k) @selector.selectedKeys.remove(k) end # Break out of the loop if the suspend method has been invoked. # TODO - test me return :suspended if @suspended end nil ensure unless @suspended or @done @selector.keys.to_a.each { |k| k.channel.close } # Close any sockets opened by this EventLoop @selector.close @done = true end end |
#on_accept(address, options = {}, &callback) ⇒ Object
Convenience method: Listen for TCP connections on a particular address (given as [host, port]), and invoke the given block when a connection is received.
If port 0 is specified, the operating system will choose a port.
Returns the ListeningSocketWrapper.
Options:
- :multiple
-
If true, then the parameter is a list of addresses, and multiple ListeningSocketWrapper objects will be returned as an array.
119 120 121 122 |
# File 'lib/scriptty/net/event_loop.rb', line 119 def on_accept(address, ={}, &callback) raise ArgumentError.new("no block given") unless callback listen(address, ) { |listener| listener.on_accept(&callback) } end |
#on_connect(address, &callback) ⇒ Object
Convenience method: Initiate a TCP connection to the specified address (given as [host, port]) and invoke the given block when a connection is made.
Returns the OutgoingConnectionWrapper that will be connected to.
156 157 158 159 |
# File 'lib/scriptty/net/event_loop.rb', line 156 def on_connect(address, &callback) raise ArgumentError.new("no block given") unless callback connect(address) { |conn| conn.on_connect(&callback) } end |
#resume ⇒ Object
Resume a
Always use the resume method after using this method.
249 250 251 252 253 |
# File 'lib/scriptty/net/event_loop.rb', line 249 def resume raise ArgumentError.new("not suspended") unless @suspended @suspended = false main end |
#suspend ⇒ Object
Temporarily break out of the event loop.
NOTE: Always use the resume method after using this method, since otherwise network connections will hang. This method is NOT thread-safe.
To exit the event loop permanently, use the exit method.
241 242 243 244 |
# File 'lib/scriptty/net/event_loop.rb', line 241 def suspend @suspended = true @selector.wakeup end |
#timer(delay, options = {}, &callback) ⇒ Object
Invoke the specified callback after the specified number of seconds have elapsed.
Return the ScripTTY::Net::EventLoop::Timer object for the timer.
165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/scriptty/net/event_loop.rb', line 165 def timer(delay, ={}, &callback) raise ArgumentError.new("no block given") unless callback new_timer = Timer.new(self, Time.now + delay, callback, ) i = 0 while i < @timer_queue.length # Insert new timer in the correct sort order break if @timer_queue[i].expire_at > new_timer.expire_at i += 1 end @timer_queue.insert(i, new_timer) @selector.wakeup new_timer end |