Class: Cql::Io::IoLoopBody

Inherits:
Object
  • Object
show all
Defined in:
lib/cql/io/io_reactor.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ IoLoopBody

Returns a new instance of IoLoopBody.



263
264
265
266
267
268
269
# File 'lib/cql/io/io_reactor.rb', line 263

def initialize(options={})
  @selector = options[:selector] || IO
  @clock = options[:clock] || Time
  @lock = Mutex.new
  @sockets = []
  @timers = []
end

Instance Method Details

#add_socket(socket) ⇒ Object



271
272
273
274
275
276
277
# File 'lib/cql/io/io_reactor.rb', line 271

def add_socket(socket)
  @lock.synchronize do
    sockets = @sockets.reject { |s| s.closed? }
    sockets << socket
    @sockets = sockets
  end
end

#cancel_timersObject



298
299
300
301
302
303
304
305
# File 'lib/cql/io/io_reactor.rb', line 298

def cancel_timers
  @timers.each do |pair|
    if pair[1]
      pair[1].fail(CancelledError.new)
      pair[1] = nil
    end
  end
end

#close_socketsObject



288
289
290
291
292
293
294
295
296
# File 'lib/cql/io/io_reactor.rb', line 288

def close_sockets
  @sockets.each do |s|
    begin
      s.close unless s.closed?
    rescue
      # the socket had most likely already closed due to an error
    end
  end
end

#schedule_timer(timeout, promise = Promise.new) ⇒ Object



279
280
281
282
283
284
285
286
# File 'lib/cql/io/io_reactor.rb', line 279

def schedule_timer(timeout, promise=Promise.new)
  @lock.synchronize do
    timers = @timers.reject { |pair| pair[1].nil? }
    timers << [@clock.now + timeout, promise]
    @timers = timers
  end
  promise.future
end

#tick(timeout = 1) ⇒ Object



307
308
309
310
# File 'lib/cql/io/io_reactor.rb', line 307

def tick(timeout=1)
  check_sockets!(timeout)
  check_timers!
end

#to_sObject



312
313
314
# File 'lib/cql/io/io_reactor.rb', line 312

def to_s
  %(#<#{IoReactor.name} @connections=[#{@sockets.map(&:to_s).join(', ')}]>)
end