Class: RocketChat::Realtime::Reactor
- Inherits:
-
Object
- Object
- RocketChat::Realtime::Reactor
- Extended by:
- Forwardable
- Includes:
- Singleton
- Defined in:
- lib/rocket_chat/realtime/reactor.rb
Overview
The scheduler manager
Instance Attribute Summary collapse
- #clients ⇒ Object readonly
- #selector ⇒ Object readonly
Instance Method Summary collapse
-
#deregister(client) ⇒ Object
Deregister Client.
-
#initialize ⇒ Reactor
constructor
Initialize Reactor.
-
#register(client) ⇒ Object
Register Client.
-
#registered?(client) ⇒ Boolean
The client is registered.
-
#reset ⇒ Object
Reset reactor state.
-
#run ⇒ Object
Wait I/O ready for read or write.
-
#stop ⇒ Object
Stop reactor.
-
#stopped? ⇒ Boolean
The reactor is stopped.
Constructor Details
#initialize ⇒ Reactor
Initialize Reactor
40 41 42 43 44 |
# File 'lib/rocket_chat/realtime/reactor.rb', line 40 def initialize @selector = NIO::Selector.new @clients = Set.new reset end |
Instance Attribute Details
#clients ⇒ Object (readonly)
35 36 37 |
# File 'lib/rocket_chat/realtime/reactor.rb', line 35 def clients @clients end |
#selector ⇒ Object (readonly)
35 36 37 |
# File 'lib/rocket_chat/realtime/reactor.rb', line 35 def selector @selector end |
Instance Method Details
#deregister(client) ⇒ Object
Deregister Client
76 77 78 79 80 81 |
# File 'lib/rocket_chat/realtime/reactor.rb', line 76 def deregister(client) return unless registered?(client) @clients.delete(client) selector.deregister(client.connector.socket) end |
#register(client) ⇒ Object
Register Client
62 63 64 65 66 67 68 69 |
# File 'lib/rocket_chat/realtime/reactor.rb', line 62 def register(client) return if registered?(client) @clients.add(client) monitor = selector.register(client.connector.socket, :rw) monitor.value = client monitor end |
#registered?(client) ⇒ Boolean
The client is registered
53 54 55 |
# File 'lib/rocket_chat/realtime/reactor.rb', line 53 def registered?(client) @clients.include?(client) end |
#reset ⇒ Object
Reset reactor state
86 87 88 89 90 |
# File 'lib/rocket_chat/realtime/reactor.rb', line 86 def reset # TODO: Clear clients and registered monitor @thread&.exit @thread = nil end |
#run ⇒ Object
Wait I/O ready for read or write
110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/rocket_chat/realtime/reactor.rb', line 110 def run return unless stopped? @thread = Thread.start do Thread.current.abort_on_exception = true until stopped? selector.select(1) do |monitor| monitor.value.process(monitor) end Thread.pass end end end |
#stop ⇒ Object
Stop reactor
95 96 97 98 |
# File 'lib/rocket_chat/realtime/reactor.rb', line 95 def stop @thread&.exit @thread = nil end |
#stopped? ⇒ Boolean
Returns the reactor is stopped.
103 104 105 |
# File 'lib/rocket_chat/realtime/reactor.rb', line 103 def stopped? @thread.nil? end |