Class: ZMachine::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/zmachine/reactor.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeReactor

Returns a new instance of Reactor.



37
38
39
40
41
42
43
44
# File 'lib/zmachine/reactor.rb', line 37

def initialize
  @heartbeat_interval = ZMachine.heartbeat_interval || 0.5 # coarse grained by default
  @next_tick_queue = ConcurrentLinkedQueue.new
  @running = false
  @shutdown_hooks = []
  # a 10 ms tick wheel with 512 slots => ~5s for a round
  @wheel = HashedWheel.new(512, 10)
end

Class Method Details

.register_reactor(reactor) ⇒ Object



17
18
19
20
21
22
# File 'lib/zmachine/reactor.rb', line 17

def self.register_reactor(reactor)
  @mutex.synchronize do
    @reactors ||= []
    @reactors << reactor
  end
end

.terminate_all_reactorsObject



24
25
26
27
28
29
# File 'lib/zmachine/reactor.rb', line 24

def self.terminate_all_reactors
  @mutex.synchronize do
    @reactors.each(&:stop_event_loop)
    @reactors.clear
  end
end

.unregister_reactor(reactor) ⇒ Object



31
32
33
34
35
# File 'lib/zmachine/reactor.rb', line 31

def self.unregister_reactor(reactor)
  @mutex.synchronize do
    @reactors.delete(reactor)
  end
end

Instance Method Details

#add_shutdown_hook(&block) ⇒ Object



46
47
48
# File 'lib/zmachine/reactor.rb', line 46

def add_shutdown_hook(&block)
  @shutdown_hooks << block
end

#add_timer(*args, &block) ⇒ Object



50
51
52
53
54
55
56
57
# File 'lib/zmachine/reactor.rb', line 50

def add_timer(*args, &block)
  check_reactor_thread
  interval = args.shift
  callback = args.shift || block
  ZMachine.logger.debug("zmachine:reactor:#{__method__}", interval: interval, callback: callback) if ZMachine.debug
  return unless callback
  @wheel.add((interval * 1000).to_i, &callback)
end

#bind(server, port_or_type = nil, handler = nil, *args, &block) ⇒ Object



59
60
61
62
63
# File 'lib/zmachine/reactor.rb', line 59

def bind(server, port_or_type=nil, handler=nil, *args, &block)
  ZMachine.logger.debug("zmachine:reactor:#{__method__}", server: server, port_or_type: port_or_type) if ZMachine.debug
  check_reactor_thread
  @connection_manager.bind(server, port_or_type, handler, *args, &block)
end

#close_connection(connection, after_writing = false, reason = nil) ⇒ Object



65
66
67
68
# File 'lib/zmachine/reactor.rb', line 65

def close_connection(connection, after_writing = false, reason = nil)
  return true unless @connection_manager
  @connection_manager.close_connection(connection, after_writing, reason)
end

#connect(server, port_or_type = nil, handler = nil, *args, &block) ⇒ Object



70
71
72
73
74
# File 'lib/zmachine/reactor.rb', line 70

def connect(server, port_or_type=nil, handler=nil, *args, &block)
  ZMachine.logger.debug("zmachine:reactor:#{__method__}", server: server, port_or_type: port_or_type) if ZMachine.debug
  check_reactor_thread
  @connection_manager.connect(server, port_or_type, handler, *args, &block)
end

#connectionsObject



76
77
78
# File 'lib/zmachine/reactor.rb', line 76

def connections
  @connection_manager.connections
end

#heartbeat_intervalObject



80
81
82
# File 'lib/zmachine/reactor.rb', line 80

def heartbeat_interval
  @heartbeat_interval
end

#heartbeat_interval=(value) ⇒ Object



84
85
86
87
# File 'lib/zmachine/reactor.rb', line 84

def heartbeat_interval=(value)
  value = 0.01 if value < 0.01
  @heartbeat_interval = value
end

#next_tick(callback = nil, &block) ⇒ Object



89
90
91
92
# File 'lib/zmachine/reactor.rb', line 89

def next_tick(callback=nil, &block)
  @next_tick_queue << (callback || block)
  wakeup if running?
end

#reconnect(server, port_or_type, handler) ⇒ Object



94
95
96
97
98
# File 'lib/zmachine/reactor.rb', line 94

def reconnect(server, port_or_type, handler)
  return handler if handler && handler.channel.is_a?(ZMQChannel)
  ZMachine.logger.debug("zmachine:reactor:#{__method__}", server: server, port_or_type: port_or_type) if ZMachine.debug
  connect(server, port_or_type, handler)
end

#run(callback = nil, shutdown_hook = nil, &block) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/zmachine/reactor.rb', line 100

def run(callback=nil, shutdown_hook=nil, &block)
  ZMachine.logger.debug("zmachine:reactor:#{__method__}") if ZMachine.debug
  add_shutdown_hook(shutdown_hook) if shutdown_hook
  begin
    Reactor.register_reactor(self)
    @running = true
    if callback = (callback || block)
      add_timer(0) { callback.call(self) }
    end
    @selector = Selector.open
    @connection_manager = ConnectionManager.new(@selector)
    @run_reactor = true
    run_reactor while @run_reactor
  ensure
    ZMachine.logger.debug("zmachine:reactor:#{__method__}", stop: :selector) if ZMachine.debug
    @selector.close rescue nil
    @selector = nil
    ZMachine.logger.debug("zmachine:reactor:#{__method__}", stop: :connections) if ZMachine.debug
    @connection_manager.shutdown
    ZMachine.logger.debug("zmachine:reactor:#{__method__}", stop: :shutdown_hooks) if ZMachine.debug
    @shutdown_hooks.pop.call until @shutdown_hooks.empty?
    @next_tick_queue = ConcurrentLinkedQueue.new
    @running = false
    Reactor.unregister_reactor(self)
    ZMachine.logger.debug("zmachine:reactor:#{__method__}", stop: :zcontext) if ZMachine.debug
    ZMachine.reactor = nil
  end
end

#run_reactorObject



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/zmachine/reactor.rb', line 129

def run_reactor
  ZMachine.logger.debug("zmachine:reactor:#{__method__}") if ZMachine.debug
  run_deferred_callbacks
  return unless @run_reactor
  @wheel.advance
  return unless @run_reactor
  @connection_manager.cleanup
  if @connection_manager.idle?
    ZMachine.logger.debug("zmachine:reactor:#{__method__}", select: @heartbeat_interval) if ZMachine.debug
    @selector.select(@heartbeat_interval * 1000)
  else
    ZMachine.logger.debug("zmachine:reactor:#{__method__}", select: :now) if ZMachine.debug
    @selector.select_now
  end
  @connection_manager.process
end

#running?Boolean

Returns:

  • (Boolean)


146
147
148
# File 'lib/zmachine/reactor.rb', line 146

def running?
  @running
end

#stop_event_loopObject



150
151
152
153
154
# File 'lib/zmachine/reactor.rb', line 150

def stop_event_loop
  @run_reactor = false
  @connection_manager.shutdown
  wakeup
end

#stop_server(channel) ⇒ Object



156
157
158
# File 'lib/zmachine/reactor.rb', line 156

def stop_server(channel)
  channel.close
end