Module: CrystalRuby::Reactor

Defined in:
lib/crystalruby/reactor.rb

Overview

The Reactor represents a singleton Thread responsible for running all Ruby/crystal interop code. Crystal’s Fiber scheduler and GC assume all code is run on a single thread. This class is responsible for multiplexing Ruby and Crystal code onto a single thread. Functions annotated with async: true, are executed using callbacks to allow these to be interleaved without blocking multiple Ruby threads.

Defined Under Namespace

Classes: SingleThreadViolation, StopReactor

Constant Summary collapse

REACTOR_QUEUE =
Queue.new
THREAD_MAP =

We maintain a map of threads, each with a mutex, condition variable, and result

Hash.new do |h, tid_or_thread, tid = tid_or_thread|
  if tid_or_thread.is_a?(Thread)
    ObjectSpace.define_finalizer(tid_or_thread) do
      THREAD_MAP.delete(tid_or_thread)
      THREAD_MAP.delete(tid_or_thread.object_id)
    end
    tid = tid_or_thread.object_id
  end

  h[tid] = {
    mux: Mutex.new,
    cond: ConditionVariable.new,
    result: nil,
    thread_id: tid
  }
  h[tid_or_thread] = h[tid] if tid_or_thread.is_a?(Thread)
end
CALLBACKS_MAP =

We memoize callbacks, once per return type

Hash.new do |h, rt|
  h[rt] = FFI::Function.new(:void, [:int, *(rt == :void ? [] : [rt])]) do |tid, ret|
    THREAD_MAP[tid][:error] = nil
    THREAD_MAP[tid][:result] = ret
    THREAD_MAP[tid][:cond].signal
  end
end
ERROR_CALLBACK =
FFI::Function.new(:void, %i[string string string int]) do |error_type, message, backtrace, tid|
  error_type = error_type.to_sym
  is_exception_type = Object.const_defined?(error_type) && Object.const_get(error_type).ancestors.include?(Exception)
  error_type = is_exception_type ? Object.const_get(error_type) : RuntimeError
  error = error_type.new(message)
  error.set_backtrace(JSON.parse(backtrace))
  raise error  unless THREAD_MAP.key?(tid)

  THREAD_MAP[tid][:error] = error
  THREAD_MAP[tid][:result] = nil
  THREAD_MAP[tid][:cond].signal
end

Class Method Summary collapse

Class Method Details

.await_result!Object



63
64
65
66
67
68
69
70
71
72
73
# File 'lib/crystalruby/reactor.rb', line 63

def await_result!
  mux, cond = thread_conditions.values_at(:mux, :cond)
  cond.wait(mux)
  if error = thread_conditions[:error]
    combined_backtrace = error.backtrace[0..(error.backtrace.index{|m| m.include?('call_blocking_function')} || 2) - 3] + caller[5..-1]
    error.set_backtrace(combined_backtrace)
    raise error
  end

  thread_conditions[:result]
end

.halt_loop!Object

Raises:



75
76
77
# File 'lib/crystalruby/reactor.rb', line 75

def halt_loop!
  raise StopReactor
end

.init_single_thread_mode!Object



162
163
164
165
166
167
# File 'lib/crystalruby/reactor.rb', line 162

def init_single_thread_mode!
  @single_thread_mode ||= begin
    @main_thread_id = Thread.current.object_id
    true
  end
end

.running?Boolean

Returns:

  • (Boolean)


158
159
160
# File 'lib/crystalruby/reactor.rb', line 158

def running?
  @main_loop&.alive?
end

.schedule_work!(receiver, op_name, *args, return_type, blocking: true, async: true, lib: nil) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/crystalruby/reactor.rb', line 110

def schedule_work!(receiver, op_name, *args, return_type, blocking: true, async: true, lib: nil)
  if @single_thread_mode || (Thread.current.object_id == @main_thread_id && op_name != :yield)
    unless Thread.current.object_id == @main_thread_id
      raise SingleThreadViolation,
            "Single thread mode is enabled, cannot run in multi-threaded mode. " \
            "Reactor was started from: #{@main_thread_id}, then called from #{Thread.current.object_id}"
    end
    return receiver.send(op_name, *args)
  end

  tvars = thread_conditions
  tvars[:mux].synchronize do
    REACTOR_QUEUE.push(
      case true
      when async
        lambda {
          receiver.send(
            op_name, *args, tvars[:thread_id],
            CALLBACKS_MAP[return_type]
          )
          yield!(lib: lib, time: 0)
        }
      when blocking
        lambda {
          tvars[:error] = nil
          should_halt = false
          begin
            result = receiver.send(op_name, *args)
          rescue StopReactor => e
            should_halt = true
          rescue StandardError => e
            tvars[:error] = e
          end
          tvars[:result] = result unless tvars[:error]
          tvars[:cond].signal
          raise StopReactor if should_halt
        }
      else
        lambda {
          outstanding_jobs = receiver.send(op_name, *args)
          yield!(lib: lib, time: 0) unless outstanding_jobs == 0
        }
      end
    )
    return await_result! if blocking
  end
end

.start!Object



88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/crystalruby/reactor.rb', line 88

def start!
  @main_loop ||= Thread.new do
    @main_thread_id = Thread.current.object_id
    CrystalRuby.log_debug("Starting reactor")
    CrystalRuby.log_debug("CrystalRuby initialized")
    REACTOR_QUEUE.pop[] while true
  rescue StopReactor => e
  rescue StandardError => e
    CrystalRuby.log_error "Error: #{e}"
    CrystalRuby.log_error e.backtrace
  end
end

.stop!Object



79
80
81
82
83
84
85
86
# File 'lib/crystalruby/reactor.rb', line 79

def stop!
  if @main_loop
    schedule_work!(self, :halt_loop!, :void, blocking: true, async: false)
    @main_loop.join
    @main_loop = nil
    CrystalRuby.log_info "Reactor loop stopped"
  end
end

.thread_conditionsObject



59
60
61
# File 'lib/crystalruby/reactor.rb', line 59

def thread_conditions
  THREAD_MAP[Thread.current]
end

.thread_idObject



101
102
103
# File 'lib/crystalruby/reactor.rb', line 101

def thread_id
  Thread.current.object_id
end

.yield!(lib: nil, time: 0.0) ⇒ Object



105
106
107
108
# File 'lib/crystalruby/reactor.rb', line 105

def yield!(lib: nil, time: 0.0)
  schedule_work!(lib, :yield, :int, async: false, blocking: false, lib: lib) if running? && lib
  nil
end