Module: CrystalRuby::Reactor
- Defined in:
- lib/crystalruby/reactor.rb
Overview
The Reactor represents a singleton Thread responsible for running all Ruby/crystal interop code. Crystal’s Fiber scheduler and GC assume all code is run on a single thread. This class is responsible for multiplexing Ruby and Crystal code onto a single thread. Functions annotated with async: true, are executed using callbacks to allow these to be interleaved without blocking multiple Ruby threads.
Defined Under Namespace
Classes: SingleThreadViolation, StopReactor
Constant Summary collapse
- REACTOR_QUEUE =
Queue.new
- THREAD_MAP =
We maintain a map of threads, each with a mutex, condition variable, and result
Hash.new do |h, tid_or_thread, tid = tid_or_thread| if tid_or_thread.is_a?(Thread) ObjectSpace.define_finalizer(tid_or_thread) do THREAD_MAP.delete(tid_or_thread) THREAD_MAP.delete(tid_or_thread.object_id) end tid = tid_or_thread.object_id end h[tid] = { mux: Mutex.new, cond: ConditionVariable.new, result: nil, thread_id: tid } h[tid_or_thread] = h[tid] if tid_or_thread.is_a?(Thread) end
- CALLBACKS_MAP =
We memoize callbacks, once per return type
Hash.new do |h, rt| h[rt] = FFI::Function.new(:void, [:int, *(rt == :void ? [] : [rt])]) do |tid, ret| THREAD_MAP[tid][:error] = nil THREAD_MAP[tid][:result] = ret THREAD_MAP[tid][:cond].signal end end
- ERROR_CALLBACK =
FFI::Function.new(:void, %i[string string string int]) do |error_type, , backtrace, tid| error_type = error_type.to_sym is_exception_type = Object.const_defined?(error_type) && Object.const_get(error_type).ancestors.include?(Exception) error_type = is_exception_type ? Object.const_get(error_type) : RuntimeError error = error_type.new() error.set_backtrace(JSON.parse(backtrace)) raise error unless THREAD_MAP.key?(tid) THREAD_MAP[tid][:error] = error THREAD_MAP[tid][:result] = nil THREAD_MAP[tid][:cond].signal end
Class Method Summary collapse
- .await_result! ⇒ Object
- .halt_loop! ⇒ Object
- .init_single_thread_mode! ⇒ Object
- .running? ⇒ Boolean
- .schedule_work!(receiver, op_name, *args, return_type, blocking: true, async: true, lib: nil) ⇒ Object
- .start! ⇒ Object
- .stop! ⇒ Object
- .thread_conditions ⇒ Object
- .thread_id ⇒ Object
- .yield!(lib: nil, time: 0.0) ⇒ Object
Class Method Details
.await_result! ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/crystalruby/reactor.rb', line 63 def await_result! mux, cond = thread_conditions.values_at(:mux, :cond) cond.wait(mux) if error = thread_conditions[:error] combined_backtrace = error.backtrace[0..(error.backtrace.index{|m| m.include?('call_blocking_function')} || 2) - 3] + caller[5..-1] error.set_backtrace(combined_backtrace) raise error end thread_conditions[:result] end |
.halt_loop! ⇒ Object
75 76 77 |
# File 'lib/crystalruby/reactor.rb', line 75 def halt_loop! raise StopReactor end |
.init_single_thread_mode! ⇒ Object
162 163 164 165 166 167 |
# File 'lib/crystalruby/reactor.rb', line 162 def init_single_thread_mode! @single_thread_mode ||= begin @main_thread_id = Thread.current.object_id true end end |
.running? ⇒ Boolean
158 159 160 |
# File 'lib/crystalruby/reactor.rb', line 158 def running? @main_loop&.alive? end |
.schedule_work!(receiver, op_name, *args, return_type, blocking: true, async: true, lib: nil) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/crystalruby/reactor.rb', line 110 def schedule_work!(receiver, op_name, *args, return_type, blocking: true, async: true, lib: nil) if @single_thread_mode || (Thread.current.object_id == @main_thread_id && op_name != :yield) unless Thread.current.object_id == @main_thread_id raise SingleThreadViolation, "Single thread mode is enabled, cannot run in multi-threaded mode. " \ "Reactor was started from: #{@main_thread_id}, then called from #{Thread.current.object_id}" end return receiver.send(op_name, *args) end tvars = thread_conditions tvars[:mux].synchronize do REACTOR_QUEUE.push( case true when async lambda { receiver.send( op_name, *args, tvars[:thread_id], CALLBACKS_MAP[return_type] ) yield!(lib: lib, time: 0) } when blocking lambda { tvars[:error] = nil should_halt = false begin result = receiver.send(op_name, *args) rescue StopReactor => e should_halt = true rescue StandardError => e tvars[:error] = e end tvars[:result] = result unless tvars[:error] tvars[:cond].signal raise StopReactor if should_halt } else lambda { outstanding_jobs = receiver.send(op_name, *args) yield!(lib: lib, time: 0) unless outstanding_jobs == 0 } end ) return await_result! if blocking end end |
.start! ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/crystalruby/reactor.rb', line 88 def start! @main_loop ||= Thread.new do @main_thread_id = Thread.current.object_id CrystalRuby.log_debug("Starting reactor") CrystalRuby.log_debug("CrystalRuby initialized") REACTOR_QUEUE.pop[] while true rescue StopReactor => e rescue StandardError => e CrystalRuby.log_error "Error: #{e}" CrystalRuby.log_error e.backtrace end end |
.stop! ⇒ Object
79 80 81 82 83 84 85 86 |
# File 'lib/crystalruby/reactor.rb', line 79 def stop! if @main_loop schedule_work!(self, :halt_loop!, :void, blocking: true, async: false) @main_loop.join @main_loop = nil CrystalRuby.log_info "Reactor loop stopped" end end |
.thread_conditions ⇒ Object
59 60 61 |
# File 'lib/crystalruby/reactor.rb', line 59 def thread_conditions THREAD_MAP[Thread.current] end |
.thread_id ⇒ Object
101 102 103 |
# File 'lib/crystalruby/reactor.rb', line 101 def thread_id Thread.current.object_id end |
.yield!(lib: nil, time: 0.0) ⇒ Object
105 106 107 108 |
# File 'lib/crystalruby/reactor.rb', line 105 def yield!(lib: nil, time: 0.0) schedule_work!(lib, :yield, :int, async: false, blocking: false, lib: lib) if running? && lib nil end |