Class: ZMachine::Reactor
- Inherits:
-
Object
- Object
- ZMachine::Reactor
- Defined in:
- lib/zmachine/reactor.rb
Class Method Summary collapse
- .register_reactor(reactor) ⇒ Object
- .terminate_all_reactors ⇒ Object
- .unregister_reactor(reactor) ⇒ Object
Instance Method Summary collapse
- #add_shutdown_hook(&block) ⇒ Object
- #add_timer(*args, &block) ⇒ Object
- #bind(server, port_or_type = nil, handler = nil, *args, &block) ⇒ Object
- #close_connection(connection, after_writing = false, reason = nil) ⇒ Object
- #connect(server, port_or_type = nil, handler = nil, *args, &block) ⇒ Object
- #connections ⇒ Object
- #heartbeat_interval ⇒ Object
- #heartbeat_interval=(value) ⇒ Object
-
#initialize ⇒ Reactor
constructor
A new instance of Reactor.
- #next_tick(callback = nil, &block) ⇒ Object
- #reconnect(server, port_or_type, handler) ⇒ Object
- #run(callback = nil, shutdown_hook = nil, &block) ⇒ Object
- #run_reactor ⇒ Object
- #running? ⇒ Boolean
- #stop_event_loop ⇒ Object
- #stop_server(channel) ⇒ Object
Constructor Details
#initialize ⇒ Reactor
Returns a new instance of Reactor.
37 38 39 40 41 42 43 44 |
# File 'lib/zmachine/reactor.rb', line 37 def initialize @heartbeat_interval = ZMachine.heartbeat_interval || 0.5 # coarse grained by default @next_tick_queue = ConcurrentLinkedQueue.new @running = false @shutdown_hooks = [] # a 10 ms tick wheel with 512 slots => ~5s for a round @wheel = HashedWheel.new(512, 10) end |
Class Method Details
.register_reactor(reactor) ⇒ Object
17 18 19 20 21 22 |
# File 'lib/zmachine/reactor.rb', line 17 def self.register_reactor(reactor) @mutex.synchronize do @reactors ||= [] @reactors << reactor end end |
.terminate_all_reactors ⇒ Object
24 25 26 27 28 29 |
# File 'lib/zmachine/reactor.rb', line 24 def self.terminate_all_reactors @mutex.synchronize do @reactors.each(&:stop_event_loop) @reactors.clear end end |
.unregister_reactor(reactor) ⇒ Object
31 32 33 34 35 |
# File 'lib/zmachine/reactor.rb', line 31 def self.unregister_reactor(reactor) @mutex.synchronize do @reactors.delete(reactor) end end |
Instance Method Details
#add_shutdown_hook(&block) ⇒ Object
46 47 48 |
# File 'lib/zmachine/reactor.rb', line 46 def add_shutdown_hook(&block) @shutdown_hooks << block end |
#add_timer(*args, &block) ⇒ Object
50 51 52 53 54 55 56 57 |
# File 'lib/zmachine/reactor.rb', line 50 def add_timer(*args, &block) check_reactor_thread interval = args.shift callback = args.shift || block ZMachine.logger.debug("zmachine:reactor:#{__method__}", interval: interval, callback: callback) if ZMachine.debug return unless callback @wheel.add((interval * 1000).to_i, &callback) end |
#bind(server, port_or_type = nil, handler = nil, *args, &block) ⇒ Object
59 60 61 62 63 |
# File 'lib/zmachine/reactor.rb', line 59 def bind(server, port_or_type=nil, handler=nil, *args, &block) ZMachine.logger.debug("zmachine:reactor:#{__method__}", server: server, port_or_type: port_or_type) if ZMachine.debug check_reactor_thread @connection_manager.bind(server, port_or_type, handler, *args, &block) end |
#close_connection(connection, after_writing = false, reason = nil) ⇒ Object
65 66 67 68 |
# File 'lib/zmachine/reactor.rb', line 65 def close_connection(connection, after_writing = false, reason = nil) return true unless @connection_manager @connection_manager.close_connection(connection, after_writing, reason) end |
#connect(server, port_or_type = nil, handler = nil, *args, &block) ⇒ Object
70 71 72 73 74 |
# File 'lib/zmachine/reactor.rb', line 70 def connect(server, port_or_type=nil, handler=nil, *args, &block) ZMachine.logger.debug("zmachine:reactor:#{__method__}", server: server, port_or_type: port_or_type) if ZMachine.debug check_reactor_thread @connection_manager.connect(server, port_or_type, handler, *args, &block) end |
#connections ⇒ Object
76 77 78 |
# File 'lib/zmachine/reactor.rb', line 76 def connections @connection_manager.connections end |
#heartbeat_interval ⇒ Object
80 81 82 |
# File 'lib/zmachine/reactor.rb', line 80 def heartbeat_interval @heartbeat_interval end |
#heartbeat_interval=(value) ⇒ Object
84 85 86 87 |
# File 'lib/zmachine/reactor.rb', line 84 def heartbeat_interval=(value) value = 0.01 if value < 0.01 @heartbeat_interval = value end |
#next_tick(callback = nil, &block) ⇒ Object
89 90 91 92 |
# File 'lib/zmachine/reactor.rb', line 89 def next_tick(callback=nil, &block) @next_tick_queue << (callback || block) wakeup if running? end |
#reconnect(server, port_or_type, handler) ⇒ Object
94 95 96 97 98 |
# File 'lib/zmachine/reactor.rb', line 94 def reconnect(server, port_or_type, handler) return handler if handler && handler.channel.is_a?(ZMQChannel) ZMachine.logger.debug("zmachine:reactor:#{__method__}", server: server, port_or_type: port_or_type) if ZMachine.debug connect(server, port_or_type, handler) end |
#run(callback = nil, shutdown_hook = nil, &block) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/zmachine/reactor.rb', line 100 def run(callback=nil, shutdown_hook=nil, &block) ZMachine.logger.debug("zmachine:reactor:#{__method__}") if ZMachine.debug add_shutdown_hook(shutdown_hook) if shutdown_hook begin Reactor.register_reactor(self) @running = true if callback = (callback || block) add_timer(0) { callback.call(self) } end @selector = Selector.open @connection_manager = ConnectionManager.new(@selector) @run_reactor = true run_reactor while @run_reactor ensure ZMachine.logger.debug("zmachine:reactor:#{__method__}", stop: :selector) if ZMachine.debug @selector.close rescue nil @selector = nil ZMachine.logger.debug("zmachine:reactor:#{__method__}", stop: :connections) if ZMachine.debug @connection_manager.shutdown ZMachine.logger.debug("zmachine:reactor:#{__method__}", stop: :shutdown_hooks) if ZMachine.debug @shutdown_hooks.pop.call until @shutdown_hooks.empty? @next_tick_queue = ConcurrentLinkedQueue.new @running = false Reactor.unregister_reactor(self) ZMachine.logger.debug("zmachine:reactor:#{__method__}", stop: :zcontext) if ZMachine.debug ZMachine.reactor = nil end end |
#run_reactor ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/zmachine/reactor.rb', line 129 def run_reactor ZMachine.logger.debug("zmachine:reactor:#{__method__}") if ZMachine.debug run_deferred_callbacks return unless @run_reactor @wheel.advance return unless @run_reactor @connection_manager.cleanup if @connection_manager.idle? ZMachine.logger.debug("zmachine:reactor:#{__method__}", select: @heartbeat_interval) if ZMachine.debug @selector.select(@heartbeat_interval * 1000) else ZMachine.logger.debug("zmachine:reactor:#{__method__}", select: :now) if ZMachine.debug @selector.select_now end @connection_manager.process end |
#running? ⇒ Boolean
146 147 148 |
# File 'lib/zmachine/reactor.rb', line 146 def running? @running end |
#stop_event_loop ⇒ Object
150 151 152 153 154 |
# File 'lib/zmachine/reactor.rb', line 150 def stop_event_loop @run_reactor = false @connection_manager.shutdown wakeup end |
#stop_server(channel) ⇒ Object
156 157 158 |
# File 'lib/zmachine/reactor.rb', line 156 def stop_server(channel) channel.close end |