Class: NIO::WebSocket::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/nio/websocket/reactor.rb

Class Method Summary collapse

Class Method Details

.queue_task(&blk) ⇒ Object



7
8
9
10
11
12
13
14
# File 'lib/nio/websocket/reactor.rb', line 7

def queue_task(&blk)
  return unless block_given?
  task_mutex.synchronize do
    @task_queue ||= []
    @task_queue << blk
  end
  selector.wakeup
end

.resetObject



20
21
22
23
24
25
26
# File 'lib/nio/websocket/reactor.rb', line 20

def reset
  @reactor.exit if @reactor
  @selector = nil
  @reactor = nil
  @task_queue = nil
  @task_mutex = nil
end

.selectorObject



16
17
18
# File 'lib/nio/websocket/reactor.rb', line 16

def selector
  @selector ||= NIO::Selector.new
end

.startObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/nio/websocket/reactor.rb', line 28

def start
  WebSocket.logger.debug "Starting reactor" unless @reactor
  @reactor ||= Thread.start do
    Thread.current.abort_on_exception = true
    WebSocket.logger.info "Reactor started"
    begin
      loop do
        queue = []
        task_mutex.synchronize do
          queue = @task_queue || []
          @task_queue = []
        end
        # If something queues up while this runs, then the selector will also be awoken & won't block
        queue.each(&:call)

        selector.select 1 do |monitor|
          begin
            monitor.value.call(monitor) if monitor.value.respond_to? :call
          rescue => e
            WebSocket.logger.error "Error occured in callback on socket #{monitor.io}.  No longer handling this connection."
            WebSocket.logger.error "#{e.class}: #{e.message}"
            e.backtrace.map { |s| WebSocket.logger.error "\t#{s}" }
            monitor.close # protect global loop from being crashed by a misbehaving driver, or a sloppy disconnect
          end
        end
        Thread.pass # give other threads a chance at manipulating our selector (e.g. a new connection on the main thread trying to register)
      end
    rescue => e
      WebSocket.logger.fatal "Error occured in reactor subsystem."
      WebSocket.logger.fatal "#{e.class}: #{e.message}"
      e.backtrace.map { |s| WebSocket.logger.fatal "\t#{s}" }
      raise
    end
  end
end