Class: Ractor::Wrapper

Inherits:
Object
  • Object
show all
Defined in:
lib/ractor/wrapper.rb

Overview

An experimental class that wraps a non-shareable object in an actor, allowing multiple Ractors to access it concurrently.

WARNING: This is a highly experimental library, and currently not recommended for production use. (As of Ruby 4.0.0, the same can be said of Ractors in general.)

What is Ractor::Wrapper?

For the most part, unless an object is sharable, which generally means deeply immutable along with a few other restrictions, it cannot be accessed directly from another Ractor. This makes it difficult for multiple Ractors to share a resource that is stateful. Such a resource must typically itself be implemented as a Ractor and accessed via message passing.

Ractor::Wrapper makes it possible for an ordinary non-shareable object to be accessed from multiple Ractors. It does this by "wrapping" the object with an actor that listens for messages and invokes the object's methods in a controlled single-Ractor environment. It then provides a stub object that reproduces the interface of the original object, but responds to method calls by sending messages to the wrapper. Ractor::Wrapper can be used to implement simple actors by writing "plain" Ruby objects, or to adapt existing non-shareable objects to a multi-Ractor world.

Net::HTTP example

The following example shows how to share a single Net::HTTP session object among multiple Ractors.

require "ractor/wrapper"
require "net/http"

# Create a Net::HTTP session. Net::HTTP sessions are not shareable,
# so normally only one Ractor can access them at a time.
http = Net::HTTP.new("example.com")
http.start

# Create a wrapper around the session. This moves the session into an
# internal Ractor and listens for method call requests. By default, a
# wrapper serializes calls, handling one at a time, for compatibility
# with non-thread-safe objects.
wrapper = Ractor::Wrapper.new(http)

# At this point, the session object can no longer be accessed directly
# because it is now owned by the wrapper's internal Ractor.
#     http.get("/whoops")  # <= raises Ractor::MovedError

# However, you can access the session via the stub object provided by
# the wrapper. This stub proxies the call to the wrapper's internal
# Ractor. And it's shareable, so any number of Ractors can use it.
response = wrapper.stub.get("/")

# Here, we start two Ractors, and pass the stub to each one. Each
# Ractor can simply call methods on the stub as if it were the original
# connection object. Internally, of course, the calls are proxied to
# the original object via the wrapper, and execution is serialized.
r1 = Ractor.new(wrapper.stub) do |stub|
  5.times do
    stub.get("/hello")
  end
  :ok
end
r2 = Ractor.new(wrapper.stub) do |stub|
  5.times do
    stub.get("/ruby")
  end
  :ok
end

# Wait for the two above Ractors to finish.
r1.join
r2.join

# After you stop the wrapper, you can retrieve the underlying session
# object and access it directly again.
wrapper.async_stop
http = wrapper.recover_object
http.finish

SQLite3 example

The following example shows how to share a SQLite3 database among multiple Ractors.

require "ractor/wrapper"
require "sqlite3"

# Create a SQLite3 database. These objects are not shareable, so
# normally only one Ractor can access them.
db = SQLite3::Database.new($my_database_path)

# Create a wrapper around the database. A SQLite3::Database object
# cannot be moved between Ractors, so we configure the wrapper to run
# in the current Ractor. We can also configure it to run multiple
# worker threads because the database object itself is thread-safe.
wrapper = Ractor::Wrapper.new(db, use_current_ractor: true, threads: 2)

# At this point, the database object can still be accessed directly
# because it hasn't been moved to a different Ractor.
rows = db.execute("select * from numbers")

# You can also access the database via the stub object provided by the
# wrapper.
rows = wrapper.stub.execute("select * from numbers")

# Here, we start two Ractors, and pass the stub to each one. The
# wrapper's worker threads will handle the requests concurrently.
r1 = Ractor.new(wrapper.stub) do |stub|
  5.times do
    stub.execute("select * from numbers")
  end
  :ok
end
r2 = Ractor.new(wrapper.stub) do |stub|
  5.times do
    stub.execute("select * from numbers")
  end
  :ok
end

# Wait for the two above Ractors to finish.
r1.join
r2.join

# After stopping the wrapper, you can call the join method to wait for
# it to completely finish.
wrapper.async_stop
wrapper.join

# When running a wrapper with :use_current_ractor, you do not need to
# recover the object, because it was never moved. The recover_object
# method is not available.
#     db2 = wrapper.recover_object  # <= raises Ractor::Wrapper::Error

Features

  • Provides a Ractor-shareable method interface to a non-shareable object.
  • Supports arbitrary method arguments and return values.
  • Can be configured to run in its own isolated Ractor or in a Thread in the current Ractor.
  • Can be configured per method whether to copy or move arguments and return values.
  • Blocks can be run in the calling Ractor or in the object Ractor.
  • Raises exceptions thrown by the method.
  • Can serialize method calls for non-thread-safe objects, or run methods concurrently in multiple worker threads for thread-safe objects.
  • Can gracefully shut down the wrapper and retrieve the original object.

Caveats

  • Certain types cannot be used as method arguments or return values because they cannot be moved between Ractors. As of Ruby 4.0.0, these include threads, backtraces, procs, and a few others.
  • As of Ruby 4.0.0, any exceptions raised are always copied (rather than moved) back to the calling Ractor, and the backtrace is cleared out. This is due to https://bugs.ruby-lang.org/issues/21818
  • Blocks can be run "in place" (i.e. in the wrapped object context) only if the block does not access any data outside the block. Otherwise, the block must be run in caller's context.
  • Blocks configured to run in the caller's context can only be run while a method is executing. They cannot be "saved" as a proc to be run later unless they are configured to run "in place". In particular, using blocks as a syntax to define callbacks can generally not be done through a wrapper.

Defined Under Namespace

Classes: Configuration, CrashedError, Error, MethodSettings, StoppedError, Stub

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(object, use_current_ractor: false, name: nil, threads: 0, arguments: nil, results: nil, block_arguments: nil, block_results: nil, block_environment: nil, enable_logging: false) {|config| ... } ⇒ Wrapper

Create a wrapper around the given object.

If you pass an optional block, a Configuration object will be yielded to it, allowing additional configuration before the wrapper starts. In particular, per-method configuration must be set in this block. Block-provided settings override keyword arguments.

See Configuration for more information about the method communication and block settings.

Parameters:

  • object (Object)

    The non-shareable object to wrap.

  • use_current_ractor (boolean) (defaults to: false)

    If true, the wrapper is run in a thread in the current Ractor instead of spawning a new Ractor (the default behavior). This option can be used if the wrapped object cannot be moved or must run in the main Ractor. Can also be set via the configuration block.

  • name (String) (defaults to: nil)

    A name for this wrapper. Used during logging. Can also be set via the configuration block. Defaults to the object_id.

  • threads (Integer) (defaults to: 0)

    The number of worker threads to run. Defaults to 0, which causes the wrapper to run sequentially without spawning workers. Can also be set via the configuration block.

  • arguments (:move, :copy) (defaults to: nil)

    How to communicate method arguments by default. If not specified, defaults to :copy.

  • results (:move, :copy, :void) (defaults to: nil)

    How to communicate method return values by default. If not specified, defaults to :copy.

  • block_arguments (:move, :copy) (defaults to: nil)

    How to communicate block arguments by default. If not specified, defaults to :copy.

  • block_results (:move, :copy, :void) (defaults to: nil)

    How to communicate block result values by default. If not specified, defaults to :copy.

  • block_environment (:caller, :wrapped) (defaults to: nil)

    How to execute blocks, and what scope blocks have access to. If not specified, defaults to :caller.

  • enable_logging (boolean) (defaults to: false)

    Set to true to enable logging. Default is false. Can also be set via the configuration block.

Yields:

  • (config)

    An optional configuration block.

Yield Parameters:

Raises:

  • (::Ractor::MovedError)


525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
# File 'lib/ractor/wrapper.rb', line 525

def initialize(object,
               use_current_ractor: false,
               name: nil,
               threads: 0,
               arguments: nil,
               results: nil,
               block_arguments: nil,
               block_results: nil,
               block_environment: nil,
               enable_logging: false)
  raise ::Ractor::MovedError, "cannot wrap a moved object" if ::Ractor::MovedObject === object

  config = Configuration.new
  config.name = name || object_id.to_s
  config.enable_logging = enable_logging
  config.threads = threads
  config.use_current_ractor = use_current_ractor
  config.configure_method(arguments: arguments,
                          results: results,
                          block_arguments: block_arguments,
                          block_results: block_results,
                          block_environment: block_environment)
  yield config if block_given?

  @name = config.name
  @enable_logging = config.enable_logging
  @threads = config.threads
  @method_settings = config.final_method_settings
  @stub = Stub.new(self)

  if config.use_current_ractor
    setup_local_server(object)
  else
    setup_isolated_server(object)
  end
end

Instance Attribute Details

#nameString (readonly)

Return the name of this wrapper.

Returns:

  • (String)


567
568
569
# File 'lib/ractor/wrapper.rb', line 567

def name
  @name
end

#stubRactor::Wrapper::Stub (readonly)

Return the wrapper stub. This is an object that responds to the same methods as the wrapped object, providing an easy way to call a wrapper.



613
614
615
# File 'lib/ractor/wrapper.rb', line 613

def stub
  @stub
end

#threadsInteger (readonly)

Return the number of worker threads used by the wrapper.

Returns:

  • (Integer)


592
593
594
# File 'lib/ractor/wrapper.rb', line 592

def threads
  @threads
end

Instance Method Details

#async_stopself

Request that the wrapper stop. All currently running calls will complete before the wrapper actually terminates. However, any new calls will fail.

This method is idempotent and can be called multiple times (even from different ractors).

Returns:

  • (self)


667
668
669
670
671
672
673
674
# File 'lib/ractor/wrapper.rb', line 667

def async_stop
  maybe_log("Stopping wrapper")
  @port.send(StopMessage.new.freeze)
  self
rescue ::Ractor::ClosedError
  # Ignore to allow stops to be idempotent.
  self
end

#call(method_name, *args, **kwargs) ⇒ Object

A lower-level interface for calling methods through the wrapper.

Parameters:

  • method_name (Symbol)

    The name of the method to call

  • args (arguments)

    The positional arguments

  • kwargs (keywords)

    The keyword arguments

Returns:

  • (Object)

    The return value



623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
# File 'lib/ractor/wrapper.rb', line 623

def call(method_name, *args, **kwargs, &)
  reply_port = ::Ractor::Port.new
  transaction = make_transaction
  settings = method_settings(method_name)
  block_arg = make_block_arg(settings, &)
  message = CallMessage.new(method_name: method_name,
                            args: args,
                            kwargs: kwargs,
                            block_arg: block_arg,
                            transaction: transaction,
                            settings: settings,
                            reply_port: reply_port)
  maybe_log("Sending method", method_name: method_name, transaction: transaction)
  begin
    @port.send(message, move: settings.arguments == :move)
  rescue ::Ractor::ClosedError
    raise StoppedError, "Wrapper has stopped"
  end
  loop do
    reply_message = reply_port.receive
    case reply_message
    when YieldMessage
      handle_yield(reply_message, transaction, settings, method_name, &)
    when ReturnMessage
      maybe_log("Received result", method_name: method_name, transaction: transaction)
      return reply_message.value
    when ExceptionMessage
      maybe_log("Received exception", method_name: method_name, transaction: transaction)
      raise reply_message.exception
    end
  end
ensure
  reply_port.close
end

#enable_logging?Boolean

Return whether logging is enabled for this wrapper.

Returns:

  • (Boolean)


583
584
585
# File 'lib/ractor/wrapper.rb', line 583

def enable_logging?
  @enable_logging
end

#joinself

Blocks until the wrapper has fully stopped.

Unlike Thread#join and Ractor#join, if a Wrapper crashes, the exception generally does not get raised out of Wrapper#join. Instead, it just returns self in the same way as normal termination.

Returns:

  • (self)


685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
# File 'lib/ractor/wrapper.rb', line 685

def join
  if @ractor
    @ractor.join
  else
    reply_port = ::Ractor::Port.new
    begin
      @port.send(JoinMessage.new(reply_port))
      reply_port.receive
    rescue ::Ractor::ClosedError
      # Assume the wrapper has stopped if the port is not sendable
    ensure
      reply_port.close
    end
  end
  self
end

#method_settings(method_name) ⇒ MethodSettings

Return the method settings for the given method name. This returns the default method settings if the given method is not configured explicitly by name.

Parameters:

  • method_name (Symbol, nil)

    The method name, or nil to return the defaults.

Returns:



603
604
605
# File 'lib/ractor/wrapper.rb', line 603

def method_settings(method_name)
  (method_name && @method_settings[method_name.to_sym]) || @method_settings[nil]
end

#recover_objectObject

Retrieves the original object that was wrapped. This should be called only after a stop request has been issued using #async_stop, and may block until the wrapper has fully stopped.

This can be called only if the wrapper was not configured with use_current_ractor: true. If the wrapper had that configuration, the object will not be moved, and does not need to be recovered. In such a case, any calls to this method will raise Ractor::Error.

Only one ractor may call this method; any additional calls will fail with a Ractor::Wrapper::Error.

Returns:

  • (Object)

    The original wrapped object

Raises:



717
718
719
720
721
722
723
724
# File 'lib/ractor/wrapper.rb', line 717

def recover_object
  raise Error, "cannot recover an object from a local wrapper" unless @ractor
  begin
    @ractor.value
  rescue ::Ractor::Error => e
    raise ::Ractor::Wrapper::Error, e.message, cause: e
  end
end

#use_current_ractor?boolean

Determine whether this wrapper runs in the current Ractor

Returns:

  • (boolean)


574
575
576
# File 'lib/ractor/wrapper.rb', line 574

def use_current_ractor?
  @ractor.nil?
end