Class: ScripTTY::Net::EventLoop

Inherits:
Object
  • Object
show all
Defined in:
lib/scriptty/net/event_loop.rb

Defined Under Namespace

Classes: ConnectionWrapper, ListeningSocketWrapper, OutgoingConnectionWrapper, SocketChannelWrapper, Timer

Constant Summary collapse

DEBUG =
false

Instance Method Summary collapse

Constructor Details

#initializeEventLoop

Returns a new instance of EventLoop.



44
45
46
47
48
49
50
51
# File 'lib/scriptty/net/event_loop.rb', line 44

def initialize
  @selector = Selector.open
  @read_buffer = ByteBuffer.allocate(4096)
  @exit_mutex = Mutex.new   # protects
  @exit_requested = false
  @timer_queue = []    # sorted list of timers, in ascending order of expire_at time
  @done = false
end

Instance Method Details

#connect(address) ⇒ Object

Initiate a TCP connection to the specified address (given as [host, port])

If a block is given, it will be passed the OutgointConnectionWrapper object, and the result of the block will be returned. Otherwise, the OutgoingConnectionWrapper object is returned.



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/scriptty/net/event_loop.rb', line 129

def connect(address)
  connect_address = EventLoop.parse_address(address)
  chan = SocketChannel.open
  chan.configureBlocking(false)
  chan.socket.setOOBInline(true)    # Receive TCP URGent data (but not the fact that it's urgent) in-band
  chan.connect(connect_address)
  cw = OutgoingConnectionWrapper.new(self, chan, address)
  # We want OP_CONNECT here (and OP_READ after the connection is
  # established) so that we can tell when the connection is
  # established/dropped, even if the user does not specifiy any
  # on_connect or on_close/on_read_bytes callbacks.
  chan.register(@selector, SelectionKey::OP_CONNECT)
  selection_key = chan.keyFor(@selector)   # SelectionKey object
  check_glassfish_issue3027(selection_key)
  selection_key.attach({:connection_wrapper => cw})
  if block_given?
    yield cw
  else
    cw
  end
end

#done?Boolean

Return true if the event loop is done executing.

Returns:

  • (Boolean)


64
65
66
# File 'lib/scriptty/net/event_loop.rb', line 64

def done?
  @done
end

#exitObject

Instruct the main loop to exit. Returns immediately.

  • This method may safely be called from any thread.

  • This method may safely be invoked multiple times.



57
58
59
60
61
# File 'lib/scriptty/net/event_loop.rb', line 57

def exit
  @exit_mutex.synchronize { @exit_requested = true }
  @selector.wakeup
  nil
end

#listen(address, options = {}, &block) ⇒ Object

Listen for TCP connections on the specified address (given as [host, port])

If port 0 is specified, the operating system will choose a port.

If a block is given, it will be passed the ListeningSocketWrapper object, and the result of the block will be returned. Otherwise, the ListeningSocketWrapper object is returned.

Options:

:multiple

If true, then the parameter is a list of addresses, the block will be invoked for each one, and the return value will be an array of ListeningSocketWrapper objects.



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/scriptty/net/event_loop.rb', line 81

def listen(address, options={}, &block)
  if options[:multiple]
    # address is actually a list of addresses
    options = options.dup
    options.delete(:multiple)
    return address.map{ |addr| listen(addr, options, &block) }
  end
  bind_address = EventLoop.parse_address(address)
  schan = ServerSocketChannel.open
  schan.configureBlocking(false)
  schan.socket.bind(bind_address)
  lw = ListeningSocketWrapper.new(self, schan)
  # We want OP_ACCEPT here (and later, OP_READ), so that we can tell
  # when the connection is established/dropped, even if the user does
  # not specifiy any on_accept or on_close/on_read_bytes callbacks.
  schan.register(@selector, SelectionKey::OP_ACCEPT)
  selection_key = schan.keyFor(@selector)   # SelectionKey object
  check_glassfish_issue3027(selection_key)
  selection_key.attach({:listening_socket_wrapper => lw})
  if block
    block.call(lw)
  else
    lw
  end
end

#mainObject



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/scriptty/net/event_loop.rb', line 178

def main
  raise ArgumentError.new("use the resume method when suspended") if @suspended
  raise ArgumentError.new("Already done") if @done
  loop do
    # Exit if the "exit" method has been invoked.
    break if @exit_mutex.synchronize{ @exit_requested }
    # Exit if there are no active connections and no non-daemon timers
    break if (@selector.keys.empty? and (@timer_queue.empty? or @timer_queue.map{|t| t.daemon?}.all?))

    # If there are any timers, schedule a wake-up for when the first
    # timer expires.
    next_timer = @timer_queue.first
    if next_timer
      timeout_millis = (1000 * (next_timer.expire_at - Time.now)).to_i
      timeout_millis = nil if timeout_millis <= 0
    else
      timeout_millis = 0   # sleep indefinitely
    end

    # select(), unless the timeout has already expired
    puts "SELECT: to=#{timeout_millis.inspect} kk=#{@selector.keys.to_a.map{|k|k.attachment}.inspect} tt=#{@timer_queue.length}" if DEBUG
    if timeout_millis
      # Return when something happens, or after timeout (if timeout_millis is non-zero)
      @selector.select(timeout_millis)
    else
      # Non-blocking select
      @selector.selectNow
    end
    puts "DONE SELECT" if DEBUG

    # Invoke the callbacks for any expired timers
    now = Time.now
    until @timer_queue.empty? or now < @timer_queue.first.expire_at
      timer = @timer_queue.shift
      timer.send(:callback).call
    end
    timer = nil

    # Handle channels that are ready for I/O operations
    @selector.selectedKeys.to_a.each do |k|
      handle_selection_key(k)
      @selector.selectedKeys.remove(k)
    end

    # Break out of the loop if the suspend method has been invoked.   # TODO - test me
    return :suspended if @suspended
  end
  nil
ensure
  unless @suspended or @done
    @selector.keys.to_a.each { |k| k.channel.close }    # Close any sockets opened by this EventLoop
    @selector.close
    @done = true
  end
end

#on_accept(address, options = {}, &callback) ⇒ Object

Convenience method: Listen for TCP connections on a particular address (given as [host, port]), and invoke the given block when a connection is received.

If port 0 is specified, the operating system will choose a port.

Returns the ListeningSocketWrapper.

Options:

:multiple

If true, then the parameter is a list of addresses, and multiple ListeningSocketWrapper objects will be returned as an array.

Raises:

  • (ArgumentError)


119
120
121
122
# File 'lib/scriptty/net/event_loop.rb', line 119

def on_accept(address, options={}, &callback)
  raise ArgumentError.new("no block given") unless callback
  listen(address, options) { |listener| listener.on_accept(&callback) }
end

#on_connect(address, &callback) ⇒ Object

Convenience method: Initiate a TCP connection to the specified address (given as [host, port]) and invoke the given block when a connection is made.

Returns the OutgoingConnectionWrapper that will be connected to.

Raises:

  • (ArgumentError)


156
157
158
159
# File 'lib/scriptty/net/event_loop.rb', line 156

def on_connect(address, &callback)
  raise ArgumentError.new("no block given") unless callback
  connect(address) { |conn| conn.on_connect(&callback) }
end

#resumeObject

Resume a

Always use the resume method after using this method.

Raises:

  • (ArgumentError)


249
250
251
252
253
# File 'lib/scriptty/net/event_loop.rb', line 249

def resume
  raise ArgumentError.new("not suspended") unless @suspended
  @suspended = false
  main
end

#suspendObject

Temporarily break out of the event loop.

NOTE: Always use the resume method after using this method, since otherwise network connections will hang. This method is NOT thread-safe.

To exit the event loop permanently, use the exit method.



241
242
243
244
# File 'lib/scriptty/net/event_loop.rb', line 241

def suspend
  @suspended = true
  @selector.wakeup
end

#timer(delay, options = {}, &callback) ⇒ Object

Invoke the specified callback after the specified number of seconds have elapsed.

Return the ScripTTY::Net::EventLoop::Timer object for the timer.

Raises:

  • (ArgumentError)


165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/scriptty/net/event_loop.rb', line 165

def timer(delay, options={}, &callback)
  raise ArgumentError.new("no block given") unless callback
  new_timer = Timer.new(self, Time.now + delay, callback, options)
  i = 0
  while i < @timer_queue.length   # Insert new timer in the correct sort order
    break if @timer_queue[i].expire_at > new_timer.expire_at
    i += 1
  end
  @timer_queue.insert(i, new_timer)
  @selector.wakeup
  new_timer
end