Class: Ractor::Wrapper

Inherits:
Object
  • Object
show all
Defined in:
lib/ractor/wrapper.rb

Overview

An experimental class that wraps a non-shareable object in an actor, allowing multiple Ractors to access it concurrently.

WARNING: This is a highly experimental library, and currently not recommended for production use. (As of Ruby 4.0.0, the same can be said of Ractors in general.)

What is Ractor::Wrapper?

For the most part, unless an object is sharable, which generally means deeply immutable along with a few other restrictions, it cannot be accessed directly from another Ractor. This makes it difficult for multiple Ractors to share a resource that is stateful. Such a resource must typically itself be implemented as a Ractor and accessed via message passing.

Ractor::Wrapper makes it possible for an ordinary non-shareable object to be accessed from multiple Ractors. It does this by "wrapping" the object with an actor that listens for messages and invokes the object's methods in a controlled single-Ractor environment. It then provides a stub object that reproduces the interface of the original object, but responds to method calls by sending messages to the wrapper. Ractor::Wrapper can be used to implement simple actors by writing "plain" Ruby objects, or to adapt existing non-shareable objects to a multi-Ractor world.

Net::HTTP example

The following example shows how to share a single Net::HTTP session object among multiple Ractors.

require "ractor/wrapper"
require "net/http"

# Create a Net::HTTP session. Net::HTTP sessions are not shareable,
# so normally only one Ractor can access them at a time.
http = Net::HTTP.new("example.com")
http.start

# Create a wrapper around the session. This moves the session into an
# internal Ractor and listens for method call requests. By default, a
# wrapper serializes calls, handling one at a time, for compatibility
# with non-thread-safe objects.
wrapper = Ractor::Wrapper.new(http)

# At this point, the session object can no longer be accessed directly
# because it is now owned by the wrapper's internal Ractor.
#     http.get("/whoops")  # <= raises Ractor::MovedError

# However, you can access the session via the stub object provided by
# the wrapper. This stub proxies the call to the wrapper's internal
# Ractor. And it's shareable, so any number of Ractors can use it.
response = wrapper.stub.get("/")

# Here, we start two Ractors, and pass the stub to each one. Each
# Ractor can simply call methods on the stub as if it were the original
# connection object. Internally, of course, the calls are proxied to
# the original object via the wrapper, and execution is serialized.
r1 = Ractor.new(wrapper.stub) do |stub|
  5.times do
    stub.get("/hello")
  end
  :ok
end
r2 = Ractor.new(wrapper.stub) do |stub|
  5.times do
    stub.get("/ruby")
  end
  :ok
end

# Wait for the two above Ractors to finish.
r1.join
r2.join

# After you stop the wrapper, you can retrieve the underlying session
# object and access it directly again.
wrapper.async_stop
http = wrapper.recover_object
http.finish

SQLite3 example

The following example shows how to share a SQLite3 database among multiple Ractors.

require "ractor/wrapper"
require "sqlite3"

# Create a SQLite3 database. These objects are not shareable, so
# normally only one Ractor can access them.
db = SQLite3::Database.new($my_database_path)

# Create a wrapper around the database. A SQLite3::Database object
# cannot be moved between Ractors, so we configure the wrapper to run
# in the current Ractor. You can also configure it to run multiple
# worker threads because the database object itself is thread-safe.
wrapper = Ractor::Wrapper.new(db, use_current_ractor: true, threads: 2)

# At this point, the database object can still be accessed directly
# because it hasn't been moved to a different Ractor.
rows = db.execute("select * from numbers")

# You can also access the database via the stub object provided by the
# wrapper.
rows = wrapper.stub.execute("select * from numbers")

# Here, we start two Ractors, and pass the stub to each one. The
# wrapper's two worker threads will handle the requests in the order
# received.
r1 = Ractor.new(wrapper.stub) do |db_stub|
  5.times do
    rows = db_stub.execute("select * from numbers")
  end
  :ok
end
r2 = Ractor.new(wrapper.stub) do |db_stub|
  5.times do
    rows = db_stub.execute("select * from numbers")
  end
  :ok
end

# Wait for the two above Ractors to finish.
r1.join
r2.join

# After stopping the wrapper, you can call the join method to wait for
# it to completely finish.
wrapper.async_stop
wrapper.join

# When running a wrapper with :use_current_ractor, you do not need to
# recover the object, because it was never moved. The recover_object
# method is not available.
#     db2 = wrapper.recover_object  # <= raises Ractor::Error

Features

  • Provides a Ractor-shareable method interface to a non-shareable object.
  • Supports arbitrary method arguments and return values.
  • Can be configured to run in its own isolated Ractor or in a Thread in the current Ractor.
  • Can be configured per method whether to copy or move arguments and return values.
  • Blocks can be run in the calling Ractor or in the object Ractor.
  • Raises exceptions thrown by the method.
  • Can serialize method calls for non-thread-safe objects, or run methods concurrently in multiple worker threads for thread-safe objects.
  • Can gracefully shut down the wrapper and retrieve the original object.

Caveats

  • Certain types cannot be used as method arguments or return values because they cannot be moved between Ractors. As of Ruby 4.0.0, these include threads, backtraces, procs, and a few others.
  • As of Ruby 4.0.0, any exceptions raised are always copied (rather than moved) back to the calling Ractor, and the backtrace is cleared out. This is due to https://bugs.ruby-lang.org/issues/21818
  • Blocks can be run "in place" (i.e. in the wrapped object context) only if the block does not access any data outside the block. Otherwise, the block must be run in caller's context.
  • Blocks configured to run in the caller's context can only be run while a method is executing. They cannot be "saved" as a proc to be run later unless they are configured to run "in place". In particular, using blocks as a syntax to define callbacks can generally not be done through a wrapper.

Defined Under Namespace

Classes: MethodSettings, Stub

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(object, use_current_ractor: false, name: nil, threads: 0, move_data: false, move_arguments: nil, move_results: nil, move_block_arguments: nil, move_block_results: nil, execute_blocks_in_place: nil, enable_logging: false) {|_self| ... } ⇒ Wrapper

Create a wrapper around the given object.

If you pass an optional block, the wrapper itself will be yielded to it, at which time you can set additional configuration options. In particular, method-specific configuration must be set in this block. The configuration is frozen once the object is constructed.

Yields:

  • (_self)

Yield Parameters:

Raises:

Parameters:

  • The non-shareable object to wrap.

  • (defaults to: false)

    If true, the wrapper is run in a thread in the current Ractor instead of spawning a new Ractor (the default behavior). This option can be used if the wrapped object cannot be moved or must run in the main Ractor.

  • (defaults to: nil)

    A name for this wrapper. Used during logging.

  • (defaults to: 0)

    The number of worker threads to run. Defaults to 0, which causes the wrapper to run sequentially without spawning workers.

  • (defaults to: false)

    If true, all communication will by default move instead of copy arguments and return values. Default is false. This setting can be overridden by other :move_* settings.

  • (defaults to: nil)

    If true, all arguments will be moved instead of copied by default. If not set, uses the :move_data setting.

  • (defaults to: nil)

    If true, return values are moved instead of copied by default. If not set, uses the :move_data setting.

  • (defaults to: nil)

    If true, arguments to blocks are moved instead of copied by default. If not set, uses the :move_data setting.

  • (defaults to: nil)

    If true, result values from blocks are moved instead of copied by default. If not set, uses the :move_data setting.

  • (defaults to: nil)

    If true, blocks passed to methods are made shareable and passed into the wrapper to be executed in the wrapped environment. If false (the default), blocks are replaced by a proc that passes messages back out to the caller and executes the block in the caller's environment.

  • (defaults to: false)

    Set to true to enable logging. Default is false.



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
# File 'lib/ractor/wrapper.rb', line 313

def initialize(object,
               use_current_ractor: false,
               name: nil,
               threads: 0,
               move_data: false,
               move_arguments: nil,
               move_results: nil,
               move_block_arguments: nil,
               move_block_results: nil,
               execute_blocks_in_place: nil,
               enable_logging: false)
  raise ::Ractor::MovedError, "cannot wrap a moved object" if ::Ractor::MovedObject === object

  @method_settings = {}
  self.name = name || object_id.to_s
  self.enable_logging = enable_logging
  self.threads = threads
  configure_method(move_data: move_data,
                   move_arguments: move_arguments,
                   move_results: move_results,
                   move_block_arguments: move_block_arguments,
                   move_block_results: move_block_results,
                   execute_blocks_in_place: execute_blocks_in_place)
  yield self if block_given?
  @method_settings.freeze

  if use_current_ractor
    setup_local_server(object)
  else
    setup_isolated_server(object)
  end
  @stub = Stub.new(self)

  freeze
end

Instance Attribute Details

#nameString

Return the name of this wrapper.

Returns:



444
445
446
# File 'lib/ractor/wrapper.rb', line 444

def name
  @name
end

#stubRactor::Wrapper::Stub (readonly)

Return the wrapper stub. This is an object that responds to the same methods as the wrapped object, providing an easy way to call a wrapper.

Returns:



491
492
493
# File 'lib/ractor/wrapper.rb', line 491

def stub
  @stub
end

#threadsInteger

Return the number of worker threads used by the wrapper.

Returns:



469
470
471
# File 'lib/ractor/wrapper.rb', line 469

def threads
  @threads
end

Instance Method Details

#async_stopself

Request that the wrapper stop. All currently running calls will complete before the wrapper actually terminates. However, any new calls will fail.

This method is idempotent and can be called multiple times (even from different ractors).

Returns:



541
542
543
544
545
546
547
548
# File 'lib/ractor/wrapper.rb', line 541

def async_stop
  maybe_log("Stopping wrapper")
  @port.send(StopMessage.new.freeze)
  self
rescue ::Ractor::ClosedError
  # Ignore to allow stops to be idempotent.
  self
end

#call(method_name, *args, **kwargs) ⇒ Object

A lower-level interface for calling methods through the wrapper.

Parameters:

  • The name of the method to call

  • The positional arguments

  • The keyword arguments

Returns:

  • The return value



501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
# File 'lib/ractor/wrapper.rb', line 501

def call(method_name, *args, **kwargs, &)
  reply_port = ::Ractor::Port.new
  transaction = ::Random.rand(7_958_661_109_946_400_884_391_936).to_s(36).freeze
  settings = method_settings(method_name)
  block_arg = make_block_arg(settings, &)
  message = CallMessage.new(method_name: method_name,
                            args: args,
                            kwargs: kwargs,
                            block_arg: block_arg,
                            transaction: transaction,
                            settings: settings,
                            reply_port: reply_port)
  maybe_log("Sending method", method_name: method_name, transaction: transaction)
  @port.send(message, move: settings.move_arguments?)
  loop do
    reply_message = reply_port.receive
    case reply_message
    when YieldMessage
      handle_yield(reply_message, transaction, settings, method_name, &)
    when ReturnMessage
      maybe_log("Received result", method_name: method_name, transaction: transaction)
      reply_port.close
      return reply_message.value
    when ExceptionMessage
      maybe_log("Received exception", method_name: method_name, transaction: transaction)
      reply_port.close
      raise reply_message.exception
    end
  end
end

#configure_method(method_name = nil, move_data: false, move_arguments: nil, move_results: nil, move_block_arguments: nil, move_block_results: nil, execute_blocks_in_place: nil) ⇒ Object

Configure the move semantics for the given method (or the default settings if no method name is given.) That is, determine whether arguments, return values, and/or exceptions are copied or moved when communicated with the wrapper. By default, all objects are copied.

This method can be called only during an initialization block. All settings are frozen once the wrapper is active.

Parameters:

  • (defaults to: nil)

    The name of the method being configured, or nil to set defaults for all methods not configured explicitly.

  • (defaults to: false)

    If true, communication for this method will move instead of copy arguments and return values. Default is false. This setting can be overridden by other :move_* settings.

  • (defaults to: nil)

    If true, arguments for this method are moved instead of copied. If not set, uses the :move_data setting.

  • (defaults to: nil)

    If true, return values for this method are moved instead of copied. If not set, uses the :move_data setting.

  • (defaults to: nil)

    If true, arguments to blocks passed to this method are moved instead of copied. If not set, uses the :move_data setting.

  • (defaults to: nil)

    If true, result values from blocks passed to this method are moved instead of copied. If not set, uses the :move_data setting.

  • (defaults to: nil)

    If true, blocks passed to this method are made shareable and passed into the wrapper to be executed in the wrapped environment. If false (the default), blocks are replaced by a proc that passes messages back out to the caller and executes the block in the caller's environment.



422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
# File 'lib/ractor/wrapper.rb', line 422

def configure_method(method_name = nil,
                     move_data: false,
                     move_arguments: nil,
                     move_results: nil,
                     move_block_arguments: nil,
                     move_block_results: nil,
                     execute_blocks_in_place: nil)
  method_name = method_name.to_sym unless method_name.nil?
  @method_settings[method_name] =
    MethodSettings.new(move_data: move_data,
                       move_arguments: move_arguments,
                       move_results: move_results,
                       move_block_arguments: move_block_arguments,
                       move_block_results: move_block_results,
                       execute_blocks_in_place: execute_blocks_in_place)
end

#enable_logging=(value) ⇒ Object

Enable or disable internal debug logging.

This method can be called only during an initialization block. All settings are frozen once the wrapper is active.

Parameters:



375
376
377
# File 'lib/ractor/wrapper.rb', line 375

def enable_logging=(value)
  @enable_logging = value ? true : false
end

#enable_logging?Boolean

Return whether logging is enabled for this wrapper.

Returns:



460
461
462
# File 'lib/ractor/wrapper.rb', line 460

def enable_logging?
  @enable_logging
end

#joinself

Blocks until the wrapper has fully stopped.

Returns:



555
556
557
558
559
560
561
562
563
564
565
566
567
# File 'lib/ractor/wrapper.rb', line 555

def join
  if @ractor
    @ractor.join
  else
    reply_port = ::Ractor::Port.new
    @port.send(JoinMessage.new(reply_port))
    reply_port.receive
    reply_port.close
  end
  self
rescue ::Ractor::ClosedError
  self
end

#method_settings(method_name) ⇒ MethodSettings

Return the method settings for the given method name. This returns the default method settings if the given method is not configured explicitly by name.

Parameters:

  • The method name, or nil to return the defaults.

Returns:



480
481
482
483
# File 'lib/ractor/wrapper.rb', line 480

def method_settings(method_name)
  method_name = method_name.to_sym
  @method_settings[method_name] || @method_settings[nil]
end

#recover_objectObject

Retrieves the original object that was wrapped. This should be called only after a stop request has been issued using #async_stop, and may block until the wrapper has fully stopped.

This can be called only if the wrapper was not configured with use_current_ractor: true. If the wrapper had that configuration, the object will not be moved, and does not need to be recovered. In such a case, any calls to this method will raise Ractor::Error.

Only one ractor may call this method; any additional calls will fail with a Ractor::Error.

Raises:

Returns:

  • The original wrapped object



584
585
586
587
# File 'lib/ractor/wrapper.rb', line 584

def recover_object
  raise ::Ractor::Error, "cannot recover an object from a local wrapper" unless @ractor
  @ractor.value
end

#use_current_ractor?boolean

Determine whether this wrapper runs in the current Ractor

Returns:



451
452
453
# File 'lib/ractor/wrapper.rb', line 451

def use_current_ractor?
  @ractor.nil?
end