Class: Ractor::Wrapper

Inherits:
Object
  • Object
show all
Defined in:
lib/ractor/wrapper.rb

Overview

An experimental class that wraps a non-shareable object, allowing multiple Ractors to access it concurrently.

WARNING: This is a highly experimental library, and currently not recommended for production use. (As of Ruby 3.0.0, the same can be said of Ractors in general.)

What is Ractor::Wrapper?

Ractors for the most part cannot access objects concurrently with other Ractors unless the object is shareable (that is, deeply immutable along with a few other restrictions.) If multiple Ractors need to interact with a shared resource that is stateful or otherwise not Ractor-shareable, that resource must itself be implemented and accessed as a Ractor.

Ractor::Wrapper makes it possible for such a shared resource to be implemented as an object and accessed using ordinary method calls. It does this by "wrapping" the object in a Ractor, and mapping method calls to message passing. This may make it easier to implement such a resource with a simple class rather than a full-blown Ractor with message passing, and it may also useful for adapting existing legacy object-based implementations.

Given a shared resource object, Ractor::Wrapper starts a new Ractor and "runs" the object within that Ractor. It provides you with a stub object on which you can invoke methods. The wrapper responds to these method calls by sending messages to the internal Ractor, which invokes the shared object and then sends back the result. If the underlying object is thread-safe, you can configure the wrapper to run multiple threads that can run methods concurrently. Or, if not, the wrapper can serialize requests to the object.

Example usage

The following example shows how to share a single Faraday::Conection object among multiple Ractors. Because Faraday::Connection is not itself thread-safe, this example serializes all calls to it.

require "faraday"

# Create a Faraday connection and a wrapper for it.
connection = Faraday.new "http://example.com"
wrapper = Ractor::Wrapper.new(connection)

# At this point, the connection object cannot be accessed directly
# because it has been "moved" to the wrapper's internal Ractor.
#     connection.get("/whoops")  # <= raises an error

# However, any number of Ractors can now access it through the wrapper.
# By default, access to the object is serialized; methods will not be
# invoked concurrently.
r1 = Ractor.new(wrapper) do |w|
  10.times do
    w.stub.get("/hello")
  end
  :ok
end
r2 = Ractor.new(wrapper) do |w|
  10.times do
    w.stub.get("/ruby")
  end
  :ok
end

# Wait for the two above Ractors to finish.
r1.take
r2.take

# After you stop the wrapper, you can retrieve the underlying
# connection object and access it directly again.
wrapper.async_stop
connection = wrapper.recover_object
connection.get("/finally")

Features

  • Provides a method interface to an object running in a different Ractor.
  • Supports arbitrary method arguments and return values.
  • Supports exceptions thrown by the method.
  • Can be configured to copy or move arguments, return values, and exceptions, per method.
  • Can serialize method calls for non-concurrency-safe objects, or run methods concurrently in multiple worker threads for thread-safe objects.
  • Can gracefully shut down the wrapper and retrieve the original object.

Caveats

Ractor::Wrapper is subject to some limitations (and bugs) of Ractors, as of Ruby 3.0.0.

  • You cannot pass blocks to wrapped methods.
  • Certain types cannot be used as method arguments or return values because Ractor does not allow them to be moved between Ractors. These include threads, procs, backtraces, and a few others.
  • You can call wrapper methods from multiple Ractors concurrently, but you cannot call them from multiple Threads within a single Ractor. (This is due to https://bugs.ruby-lang.org/issues/17624)
  • If you close the incoming port on a Ractor, it will no longer be able to call out via a wrapper. If you close its incoming port while a call is currently pending, that call may hang. (This is due to https://bugs.ruby-lang.org/issues/17617)

Defined Under Namespace

Classes: Message, MethodSettings, Stub

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(object, threads: 1, move: false, move_arguments: nil, move_return: nil, logging: false, name: nil) {|_self| ... } ⇒ Wrapper

Create a wrapper around the given object.

If you pass an optional block, the wrapper itself will be yielded to it at which time you can set additional configuration options. (The configuration is frozen once the object is constructed.)

Parameters:

  • object (Object)

    The non-shareable object to wrap.

  • threads (Integer) (defaults to: 1)

    The number of worker threads to run. Defaults to 1, which causes the worker to serialize calls.

Yields:

  • (_self)

Yield Parameters:



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/ractor/wrapper.rb', line 118

def initialize(object,
               threads: 1,
               move: false,
               move_arguments: nil,
               move_return: nil,
               logging: false,
               name: nil)
  @method_settings = {}
  self.threads = threads
  self.logging = logging
  self.name = name
  configure_method(move: move, move_arguments: move_arguments, move_return: move_return)
  yield self if block_given?
  @method_settings.freeze

  maybe_log("Starting server")
  @ractor = ::Ractor.new(name: name) { Server.new.run }
  opts = {
    object: object,
    threads: @threads,
    method_settings: @method_settings,
    name: @name,
    logging: @logging,
  }
  @ractor.send(opts, move: true)

  maybe_log("Server ready")
  @stub = Stub.new(self)
  freeze
end

Instance Attribute Details

#loggingBoolean

Return whether logging is enabled for this wrapper.

Returns:

  • (Boolean)


237
238
239
# File 'lib/ractor/wrapper.rb', line 237

def logging
  @logging
end

#nameString?

Return the name of this wrapper.

Returns:

  • (String, nil)


244
245
246
# File 'lib/ractor/wrapper.rb', line 244

def name
  @name
end

#stubRactor::Wrapper::Stub (readonly)

Return the wrapper stub. This is an object that responds to the same methods as the wrapped object, providing an easy way to call a wrapper.



223
224
225
# File 'lib/ractor/wrapper.rb', line 223

def stub
  @stub
end

#threadsInteger

Return the number of threads used by the wrapper.

Returns:

  • (Integer)


230
231
232
# File 'lib/ractor/wrapper.rb', line 230

def threads
  @threads
end

Instance Method Details

#async_stopself

Request that the wrapper stop. All currently running calls will complete before the wrapper actually terminates. However, any new calls will fail.

This metnod is idempotent and can be called multiple times (even from different ractors).

Returns:

  • (self)


294
295
296
297
298
299
300
301
# File 'lib/ractor/wrapper.rb', line 294

def async_stop
  maybe_log("Stopping #{name}")
  @ractor.send(Message.new(:stop))
  self
rescue ::Ractor::ClosedError
  # Ignore to allow stops to be idempotent.
  self
end

#call(method_name, *args, **kwargs) ⇒ Object

A lower-level interface for calling methods through the wrapper.

Parameters:

  • method_name (Symbol)

    The name of the method to call

  • args (arguments)

    The positional arguments

  • kwargs (keywords)

    The keyword arguments

Returns:

  • (Object)

    The return value



268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/ractor/wrapper.rb', line 268

def call(method_name, *args, **kwargs)
  request = Message.new(:call, data: [method_name, args, kwargs])
  transaction = request.transaction
  move = method_settings(method_name).move_arguments?
  maybe_log("Sending method #{method_name} (move=#{move}, transaction=#{transaction})")
  @ractor.send(request, move: move)
  reply = ::Ractor.receive_if { |msg| msg.is_a?(Message) && msg.transaction == transaction }
  case reply.type
  when :result
    maybe_log("Received result for method #{method_name} (transaction=#{transaction})")
    reply.data
  when :error
    maybe_log("Received exception for method #{method_name} (transaction=#{transaction})")
    raise reply.data
  end
end

#configure_method(method_name = nil, move: false, move_arguments: nil, move_return: nil) ⇒ Object

Configure the move semantics for the given method (or the default settings if no method name is given.) That is, determine whether arguments, return values, and/or exceptions are copied or moved when communicated with the wrapper. By default, all objects are copied.

This method can be called only during an initialization block. All settings are frozen once the wrapper is active.

Parameters:

  • method_name (Symbol, nil) (defaults to: nil)

    The name of the method being configured, or nil to set defaults for all methods not configured explicitly.

  • move (Boolean) (defaults to: false)

    Whether to move all communication. This value, if given, is used if move_arguments, move_return, or move_exceptions are not set.

  • move_arguments (Boolean) (defaults to: nil)

    Whether to move arguments.

  • move_return (Boolean) (defaults to: nil)

    Whether to move return values.



208
209
210
211
212
213
214
215
# File 'lib/ractor/wrapper.rb', line 208

def configure_method(method_name = nil,
                     move: false,
                     move_arguments: nil,
                     move_return: nil)
  method_name = method_name.to_sym unless method_name.nil?
  @method_settings[method_name] =
    MethodSettings.new(move: move, move_arguments: move_arguments, move_return: move_return)
end

#method_settings(method_name) ⇒ MethodSettings

Return the method settings for the given method name. This returns the default method settings if the given method is not configured explicitly by name.

Parameters:

  • method_name (Symbol, nil)

    The method name, or nil to return the defaults.

Returns:



255
256
257
258
# File 'lib/ractor/wrapper.rb', line 255

def method_settings(method_name)
  method_name = method_name.to_sym
  @method_settings[method_name] || @method_settings[nil]
end

#recovered_objectObject

Retrieves the original object that was wrapped. This should be called only after a stop request has been issued using #async_stop, and may block until the wrapper has fully stopped.

Only one ractor may call this method; any additional calls will fail.

Returns:

  • (Object)

    The original wrapped object



312
313
314
# File 'lib/ractor/wrapper.rb', line 312

def recovered_object
  @ractor.take
end