Class: Ractor::Wrapper
- Inherits:
-
Object
- Object
- Ractor::Wrapper
- Defined in:
- lib/ractor/wrapper.rb
Overview
An experimental class that wraps a non-shareable object in an actor, allowing multiple Ractors to access it concurrently.
WARNING: This is a highly experimental library, and currently not recommended for production use. (As of Ruby 4.0.0, the same can be said of Ractors in general.)
What is Ractor::Wrapper?
For the most part, unless an object is sharable, which generally means deeply immutable along with a few other restrictions, it cannot be accessed directly from another Ractor. This makes it difficult for multiple Ractors to share a resource that is stateful. Such a resource must typically itself be implemented as a Ractor and accessed via message passing.
Ractor::Wrapper makes it possible for an ordinary non-shareable object to be accessed from multiple Ractors. It does this by "wrapping" the object with an actor that listens for messages and invokes the object's methods in a controlled single-Ractor environment. It then provides a stub object that reproduces the interface of the original object, but responds to method calls by sending messages to the wrapper. Ractor::Wrapper can be used to implement simple actors by writing "plain" Ruby objects, or to adapt existing non-shareable objects to a multi-Ractor world.
Net::HTTP example
The following example shows how to share a single Net::HTTP session object among multiple Ractors.
require "ractor/wrapper"
require "net/http"
# Create a Net::HTTP session. Net::HTTP sessions are not shareable,
# so normally only one Ractor can access them at a time.
http = Net::HTTP.new("example.com")
http.start
# Create a wrapper around the session. This moves the session into an
# internal Ractor and listens for method call requests. By default, a
# wrapper serializes calls, handling one at a time, for compatibility
# with non-thread-safe objects.
wrapper = Ractor::Wrapper.new(http)
# At this point, the session object can no longer be accessed directly
# because it is now owned by the wrapper's internal Ractor.
# http.get("/whoops") # <= raises Ractor::MovedError
# However, you can access the session via the stub object provided by
# the wrapper. This stub proxies the call to the wrapper's internal
# Ractor. And it's shareable, so any number of Ractors can use it.
response = wrapper.stub.get("/")
# Here, we start two Ractors, and pass the stub to each one. Each
# Ractor can simply call methods on the stub as if it were the original
# connection object. Internally, of course, the calls are proxied to
# the original object via the wrapper, and execution is serialized.
r1 = Ractor.new(wrapper.stub) do |stub|
5.times do
stub.get("/hello")
end
:ok
end
r2 = Ractor.new(wrapper.stub) do |stub|
5.times do
stub.get("/ruby")
end
:ok
end
# Wait for the two above Ractors to finish.
r1.join
r2.join
# After you stop the wrapper, you can retrieve the underlying session
# object and access it directly again.
wrapper.async_stop
http = wrapper.recover_object
http.finish
SQLite3 example
The following example shows how to share a SQLite3 database among multiple Ractors.
require "ractor/wrapper"
require "sqlite3"
# Create a SQLite3 database. These objects are not shareable, so
# normally only one Ractor can access them.
db = SQLite3::Database.new($my_database_path)
# Create a wrapper around the database. A SQLite3::Database object
# cannot be moved between Ractors, so we configure the wrapper to run
# in the current Ractor. We can also configure it to run multiple
# worker threads because the database object itself is thread-safe.
wrapper = Ractor::Wrapper.new(db, use_current_ractor: true, threads: 2)
# At this point, the database object can still be accessed directly
# because it hasn't been moved to a different Ractor.
rows = db.execute("select * from numbers")
# You can also access the database via the stub object provided by the
# wrapper.
rows = wrapper.stub.execute("select * from numbers")
# Here, we start two Ractors, and pass the stub to each one. The
# wrapper's worker threads will handle the requests concurrently.
r1 = Ractor.new(wrapper.stub) do |stub|
5.times do
stub.execute("select * from numbers")
end
:ok
end
r2 = Ractor.new(wrapper.stub) do |stub|
5.times do
stub.execute("select * from numbers")
end
:ok
end
# Wait for the two above Ractors to finish.
r1.join
r2.join
# After stopping the wrapper, you can call the join method to wait for
# it to completely finish.
wrapper.async_stop
wrapper.join
# When running a wrapper with :use_current_ractor, you do not need to
# recover the object, because it was never moved. The recover_object
# method is not available.
# db2 = wrapper.recover_object # <= raises Ractor::Wrapper::Error
Features
- Provides a Ractor-shareable method interface to a non-shareable object.
- Supports arbitrary method arguments and return values.
- Can be configured to run in its own isolated Ractor or in a Thread in the current Ractor.
- Can be configured per method whether to copy or move arguments and return values.
- Blocks can be run in the calling Ractor or in the object Ractor.
- Raises exceptions thrown by the method.
- Can serialize method calls for non-thread-safe objects, or run methods concurrently in multiple worker threads for thread-safe objects.
- Can gracefully shut down the wrapper and retrieve the original object.
Caveats
- Certain types cannot be used as method arguments or return values because they cannot be moved between Ractors. As of Ruby 4.0.0, these include threads, backtraces, procs, and a few others.
- As of Ruby 4.0.0, any exceptions raised are always copied (rather than moved) back to the calling Ractor, and the backtrace is cleared out. This is due to https://bugs.ruby-lang.org/issues/21818
- Blocks can be run "in place" (i.e. in the wrapped object context) only if the block does not access any data outside the block. Otherwise, the block must be run in caller's context.
- Blocks configured to run in the caller's context can only be run while a method is executing. They cannot be "saved" as a proc to be run later unless they are configured to run "in place". In particular, using blocks as a syntax to define callbacks can generally not be done through a wrapper.
Defined Under Namespace
Classes: Configuration, CrashedError, Error, MethodSettings, StoppedError, Stub
Instance Attribute Summary collapse
-
#name ⇒ String
readonly
Return the name of this wrapper.
-
#stub ⇒ Ractor::Wrapper::Stub
readonly
Return the wrapper stub.
-
#threads ⇒ Integer
readonly
Return the number of worker threads used by the wrapper.
Instance Method Summary collapse
-
#async_stop ⇒ self
Request that the wrapper stop.
-
#call(method_name, *args, **kwargs) ⇒ Object
A lower-level interface for calling methods through the wrapper.
-
#enable_logging? ⇒ Boolean
Return whether logging is enabled for this wrapper.
-
#initialize(object, use_current_ractor: false, name: nil, threads: 0, arguments: nil, results: nil, block_arguments: nil, block_results: nil, block_environment: nil, enable_logging: false) {|config| ... } ⇒ Wrapper
constructor
Create a wrapper around the given object.
-
#join ⇒ self
Blocks until the wrapper has fully stopped.
-
#method_settings(method_name) ⇒ MethodSettings
Return the method settings for the given method name.
-
#recover_object ⇒ Object
Retrieves the original object that was wrapped.
-
#use_current_ractor? ⇒ boolean
Determine whether this wrapper runs in the current Ractor.
Constructor Details
#initialize(object, use_current_ractor: false, name: nil, threads: 0, arguments: nil, results: nil, block_arguments: nil, block_results: nil, block_environment: nil, enable_logging: false) {|config| ... } ⇒ Wrapper
Create a wrapper around the given object.
If you pass an optional block, a Configuration object will be yielded to it, allowing additional configuration before the wrapper starts. In particular, per-method configuration must be set in this block. Block-provided settings override keyword arguments.
See Configuration for more information about the method communication and block settings.
525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 |
# File 'lib/ractor/wrapper.rb', line 525 def initialize(object, use_current_ractor: false, name: nil, threads: 0, arguments: nil, results: nil, block_arguments: nil, block_results: nil, block_environment: nil, enable_logging: false) raise ::Ractor::MovedError, "cannot wrap a moved object" if ::Ractor::MovedObject === object config = Configuration.new config.name = name || object_id.to_s config.enable_logging = enable_logging config.threads = threads config.use_current_ractor = use_current_ractor config.configure_method(arguments: arguments, results: results, block_arguments: block_arguments, block_results: block_results, block_environment: block_environment) yield config if block_given? @name = config.name @enable_logging = config.enable_logging @threads = config.threads @method_settings = config.final_method_settings @stub = Stub.new(self) if config.use_current_ractor setup_local_server(object) else setup_isolated_server(object) end end |
Instance Attribute Details
#name ⇒ String (readonly)
Return the name of this wrapper.
567 568 569 |
# File 'lib/ractor/wrapper.rb', line 567 def name @name end |
#stub ⇒ Ractor::Wrapper::Stub (readonly)
Return the wrapper stub. This is an object that responds to the same methods as the wrapped object, providing an easy way to call a wrapper.
613 614 615 |
# File 'lib/ractor/wrapper.rb', line 613 def stub @stub end |
#threads ⇒ Integer (readonly)
Return the number of worker threads used by the wrapper.
592 593 594 |
# File 'lib/ractor/wrapper.rb', line 592 def threads @threads end |
Instance Method Details
#async_stop ⇒ self
Request that the wrapper stop. All currently running calls will complete before the wrapper actually terminates. However, any new calls will fail.
This method is idempotent and can be called multiple times (even from different ractors).
667 668 669 670 671 672 673 674 |
# File 'lib/ractor/wrapper.rb', line 667 def async_stop maybe_log("Stopping wrapper") @port.send(StopMessage.new.freeze) self rescue ::Ractor::ClosedError # Ignore to allow stops to be idempotent. self end |
#call(method_name, *args, **kwargs) ⇒ Object
A lower-level interface for calling methods through the wrapper.
623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 |
# File 'lib/ractor/wrapper.rb', line 623 def call(method_name, *args, **kwargs, &) reply_port = ::Ractor::Port.new transaction = make_transaction settings = method_settings(method_name) block_arg = make_block_arg(settings, &) = CallMessage.new(method_name: method_name, args: args, kwargs: kwargs, block_arg: block_arg, transaction: transaction, settings: settings, reply_port: reply_port) maybe_log("Sending method", method_name: method_name, transaction: transaction) begin @port.send(, move: settings.arguments == :move) rescue ::Ractor::ClosedError raise StoppedError, "Wrapper has stopped" end loop do = reply_port.receive case when YieldMessage handle_yield(, transaction, settings, method_name, &) when ReturnMessage maybe_log("Received result", method_name: method_name, transaction: transaction) return .value when ExceptionMessage maybe_log("Received exception", method_name: method_name, transaction: transaction) raise .exception end end ensure reply_port.close end |
#enable_logging? ⇒ Boolean
Return whether logging is enabled for this wrapper.
583 584 585 |
# File 'lib/ractor/wrapper.rb', line 583 def enable_logging? @enable_logging end |
#join ⇒ self
Blocks until the wrapper has fully stopped.
Unlike Thread#join and Ractor#join, if a Wrapper crashes, the
exception generally does not get raised out of Wrapper#join. Instead,
it just returns self in the same way as normal termination.
685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 |
# File 'lib/ractor/wrapper.rb', line 685 def join if @ractor @ractor.join else reply_port = ::Ractor::Port.new begin @port.send(JoinMessage.new(reply_port)) reply_port.receive rescue ::Ractor::ClosedError # Assume the wrapper has stopped if the port is not sendable ensure reply_port.close end end self end |
#method_settings(method_name) ⇒ MethodSettings
Return the method settings for the given method name. This returns the default method settings if the given method is not configured explicitly by name.
603 604 605 |
# File 'lib/ractor/wrapper.rb', line 603 def method_settings(method_name) (method_name && @method_settings[method_name.to_sym]) || @method_settings[nil] end |
#recover_object ⇒ Object
Retrieves the original object that was wrapped. This should be called only after a stop request has been issued using #async_stop, and may block until the wrapper has fully stopped.
This can be called only if the wrapper was not configured with
use_current_ractor: true. If the wrapper had that configuration, the
object will not be moved, and does not need to be recovered. In such a
case, any calls to this method will raise Ractor::Error.
Only one ractor may call this method; any additional calls will fail with a Ractor::Wrapper::Error.
717 718 719 720 721 722 723 724 |
# File 'lib/ractor/wrapper.rb', line 717 def recover_object raise Error, "cannot recover an object from a local wrapper" unless @ractor begin @ractor.value rescue ::Ractor::Error => e raise ::Ractor::Wrapper::Error, e., cause: e end end |
#use_current_ractor? ⇒ boolean
Determine whether this wrapper runs in the current Ractor
574 575 576 |
# File 'lib/ractor/wrapper.rb', line 574 def use_current_ractor? @ractor.nil? end |