Class: Ractor::Wrapper
- Inherits:
-
Object
- Object
- Ractor::Wrapper
- Defined in:
- lib/ractor/wrapper.rb
Overview
An experimental class that wraps a non-shareable object in an actor, allowing multiple Ractors to access it concurrently.
WARNING: This is a highly experimental library, and currently not recommended for production use. (As of Ruby 4.0.0, the same can be said of Ractors in general.)
What is Ractor::Wrapper?
For the most part, unless an object is sharable, which generally means deeply immutable along with a few other restrictions, it cannot be accessed directly from another Ractor. This makes it difficult for multiple Ractors to share a resource that is stateful. Such a resource must typically itself be implemented as a Ractor and accessed via message passing.
Ractor::Wrapper makes it possible for an ordinary non-shareable object to be accessed from multiple Ractors. It does this by "wrapping" the object with an actor that listens for messages and invokes the object's methods in a controlled single-Ractor environment. It then provides a stub object that reproduces the interface of the original object, but responds to method calls by sending messages to the wrapper. Ractor::Wrapper can be used to implement simple actors by writing "plain" Ruby objects, or to adapt existing non-shareable objects to a multi-Ractor world.
Net::HTTP example
The following example shows how to share a single Net::HTTP session object among multiple Ractors.
require "ractor/wrapper"
require "net/http"
# Create a Net::HTTP session. Net::HTTP sessions are not shareable,
# so normally only one Ractor can access them at a time.
http = Net::HTTP.new("example.com")
http.start
# Create a wrapper around the session. This moves the session into an
# internal Ractor and listens for method call requests. By default, a
# wrapper serializes calls, handling one at a time, for compatibility
# with non-thread-safe objects.
wrapper = Ractor::Wrapper.new(http)
# At this point, the session object can no longer be accessed directly
# because it is now owned by the wrapper's internal Ractor.
# http.get("/whoops") # <= raises Ractor::MovedError
# However, you can access the session via the stub object provided by
# the wrapper. This stub proxies the call to the wrapper's internal
# Ractor. And it's shareable, so any number of Ractors can use it.
response = wrapper.stub.get("/")
# Here, we start two Ractors, and pass the stub to each one. Each
# Ractor can simply call methods on the stub as if it were the original
# connection object. Internally, of course, the calls are proxied to
# the original object via the wrapper, and execution is serialized.
r1 = Ractor.new(wrapper.stub) do |stub|
5.times do
stub.get("/hello")
end
:ok
end
r2 = Ractor.new(wrapper.stub) do |stub|
5.times do
stub.get("/ruby")
end
:ok
end
# Wait for the two above Ractors to finish.
r1.join
r2.join
# After you stop the wrapper, you can retrieve the underlying session
# object and access it directly again.
wrapper.async_stop
http = wrapper.recover_object
http.finish
SQLite3 example
The following example shows how to share a SQLite3 database among multiple Ractors.
require "ractor/wrapper"
require "sqlite3"
# Create a SQLite3 database. These objects are not shareable, so
# normally only one Ractor can access them.
db = SQLite3::Database.new($my_database_path)
# Create a wrapper around the database. A SQLite3::Database object
# cannot be moved between Ractors, so we configure the wrapper to run
# in the current Ractor. You can also configure it to run multiple
# worker threads because the database object itself is thread-safe.
wrapper = Ractor::Wrapper.new(db, use_current_ractor: true, threads: 2)
# At this point, the database object can still be accessed directly
# because it hasn't been moved to a different Ractor.
rows = db.execute("select * from numbers")
# You can also access the database via the stub object provided by the
# wrapper.
rows = wrapper.stub.execute("select * from numbers")
# Here, we start two Ractors, and pass the stub to each one. The
# wrapper's two worker threads will handle the requests in the order
# received.
r1 = Ractor.new(wrapper.stub) do |db_stub|
5.times do
rows = db_stub.execute("select * from numbers")
end
:ok
end
r2 = Ractor.new(wrapper.stub) do |db_stub|
5.times do
rows = db_stub.execute("select * from numbers")
end
:ok
end
# Wait for the two above Ractors to finish.
r1.join
r2.join
# After stopping the wrapper, you can call the join method to wait for
# it to completely finish.
wrapper.async_stop
wrapper.join
# When running a wrapper with :use_current_ractor, you do not need to
# recover the object, because it was never moved. The recover_object
# method is not available.
# db2 = wrapper.recover_object # <= raises Ractor::Error
Features
- Provides a Ractor-shareable method interface to a non-shareable object.
- Supports arbitrary method arguments and return values.
- Can be configured to run in its own isolated Ractor or in a Thread in the current Ractor.
- Can be configured per method whether to copy or move arguments and return values.
- Blocks can be run in the calling Ractor or in the object Ractor.
- Raises exceptions thrown by the method.
- Can serialize method calls for non-thread-safe objects, or run methods concurrently in multiple worker threads for thread-safe objects.
- Can gracefully shut down the wrapper and retrieve the original object.
Caveats
- Certain types cannot be used as method arguments or return values because they cannot be moved between Ractors. As of Ruby 4.0.0, these include threads, backtraces, procs, and a few others.
- As of Ruby 4.0.0, any exceptions raised are always copied (rather than moved) back to the calling Ractor, and the backtrace is cleared out. This is due to https://bugs.ruby-lang.org/issues/21818
- Blocks can be run "in place" (i.e. in the wrapped object context) only if the block does not access any data outside the block. Otherwise, the block must be run in caller's context.
- Blocks configured to run in the caller's context can only be run while a method is executing. They cannot be "saved" as a proc to be run later unless they are configured to run "in place". In particular, using blocks as a syntax to define callbacks can generally not be done through a wrapper.
Defined Under Namespace
Classes: MethodSettings, Stub
Instance Attribute Summary collapse
-
#name ⇒ String
Return the name of this wrapper.
-
#stub ⇒ Ractor::Wrapper::Stub
readonly
Return the wrapper stub.
-
#threads ⇒ Integer
Return the number of worker threads used by the wrapper.
Instance Method Summary collapse
-
#async_stop ⇒ self
Request that the wrapper stop.
-
#call(method_name, *args, **kwargs) ⇒ Object
A lower-level interface for calling methods through the wrapper.
-
#configure_method(method_name = nil, move_data: false, move_arguments: nil, move_results: nil, move_block_arguments: nil, move_block_results: nil, execute_blocks_in_place: nil) ⇒ Object
Configure the move semantics for the given method (or the default settings if no method name is given.) That is, determine whether arguments, return values, and/or exceptions are copied or moved when communicated with the wrapper.
-
#enable_logging=(value) ⇒ Object
Enable or disable internal debug logging.
-
#enable_logging? ⇒ Boolean
Return whether logging is enabled for this wrapper.
-
#initialize(object, use_current_ractor: false, name: nil, threads: 0, move_data: false, move_arguments: nil, move_results: nil, move_block_arguments: nil, move_block_results: nil, execute_blocks_in_place: nil, enable_logging: false) {|_self| ... } ⇒ Wrapper
constructor
Create a wrapper around the given object.
-
#join ⇒ self
Blocks until the wrapper has fully stopped.
-
#method_settings(method_name) ⇒ MethodSettings
Return the method settings for the given method name.
-
#recover_object ⇒ Object
Retrieves the original object that was wrapped.
-
#use_current_ractor? ⇒ boolean
Determine whether this wrapper runs in the current Ractor.
Constructor Details
#initialize(object, use_current_ractor: false, name: nil, threads: 0, move_data: false, move_arguments: nil, move_results: nil, move_block_arguments: nil, move_block_results: nil, execute_blocks_in_place: nil, enable_logging: false) {|_self| ... } ⇒ Wrapper
Create a wrapper around the given object.
If you pass an optional block, the wrapper itself will be yielded to it, at which time you can set additional configuration options. In particular, method-specific configuration must be set in this block. The configuration is frozen once the object is constructed.
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 |
# File 'lib/ractor/wrapper.rb', line 313 def initialize(object, use_current_ractor: false, name: nil, threads: 0, move_data: false, move_arguments: nil, move_results: nil, move_block_arguments: nil, move_block_results: nil, execute_blocks_in_place: nil, enable_logging: false) raise ::Ractor::MovedError, "cannot wrap a moved object" if ::Ractor::MovedObject === object @method_settings = {} self.name = name || object_id.to_s self.enable_logging = enable_logging self.threads = threads configure_method(move_data: move_data, move_arguments: move_arguments, move_results: move_results, move_block_arguments: move_block_arguments, move_block_results: move_block_results, execute_blocks_in_place: execute_blocks_in_place) yield self if block_given? @method_settings.freeze if use_current_ractor setup_local_server(object) else setup_isolated_server(object) end @stub = Stub.new(self) freeze end |
Instance Attribute Details
#name ⇒ String
Return the name of this wrapper.
444 445 446 |
# File 'lib/ractor/wrapper.rb', line 444 def name @name end |
#stub ⇒ Ractor::Wrapper::Stub (readonly)
Return the wrapper stub. This is an object that responds to the same methods as the wrapped object, providing an easy way to call a wrapper.
491 492 493 |
# File 'lib/ractor/wrapper.rb', line 491 def stub @stub end |
#threads ⇒ Integer
Return the number of worker threads used by the wrapper.
469 470 471 |
# File 'lib/ractor/wrapper.rb', line 469 def threads @threads end |
Instance Method Details
#async_stop ⇒ self
Request that the wrapper stop. All currently running calls will complete before the wrapper actually terminates. However, any new calls will fail.
This method is idempotent and can be called multiple times (even from different ractors).
541 542 543 544 545 546 547 548 |
# File 'lib/ractor/wrapper.rb', line 541 def async_stop maybe_log("Stopping wrapper") @port.send(StopMessage.new.freeze) self rescue ::Ractor::ClosedError # Ignore to allow stops to be idempotent. self end |
#call(method_name, *args, **kwargs) ⇒ Object
A lower-level interface for calling methods through the wrapper.
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 |
# File 'lib/ractor/wrapper.rb', line 501 def call(method_name, *args, **kwargs, &) reply_port = ::Ractor::Port.new transaction = ::Random.rand(7_958_661_109_946_400_884_391_936).to_s(36).freeze settings = method_settings(method_name) block_arg = make_block_arg(settings, &) = CallMessage.new(method_name: method_name, args: args, kwargs: kwargs, block_arg: block_arg, transaction: transaction, settings: settings, reply_port: reply_port) maybe_log("Sending method", method_name: method_name, transaction: transaction) @port.send(, move: settings.move_arguments?) loop do = reply_port.receive case when YieldMessage handle_yield(, transaction, settings, method_name, &) when ReturnMessage maybe_log("Received result", method_name: method_name, transaction: transaction) reply_port.close return .value when ExceptionMessage maybe_log("Received exception", method_name: method_name, transaction: transaction) reply_port.close raise .exception end end end |
#configure_method(method_name = nil, move_data: false, move_arguments: nil, move_results: nil, move_block_arguments: nil, move_block_results: nil, execute_blocks_in_place: nil) ⇒ Object
Configure the move semantics for the given method (or the default settings if no method name is given.) That is, determine whether arguments, return values, and/or exceptions are copied or moved when communicated with the wrapper. By default, all objects are copied.
This method can be called only during an initialization block. All settings are frozen once the wrapper is active.
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 |
# File 'lib/ractor/wrapper.rb', line 422 def configure_method(method_name = nil, move_data: false, move_arguments: nil, move_results: nil, move_block_arguments: nil, move_block_results: nil, execute_blocks_in_place: nil) method_name = method_name.to_sym unless method_name.nil? @method_settings[method_name] = MethodSettings.new(move_data: move_data, move_arguments: move_arguments, move_results: move_results, move_block_arguments: move_block_arguments, move_block_results: move_block_results, execute_blocks_in_place: execute_blocks_in_place) end |
#enable_logging=(value) ⇒ Object
Enable or disable internal debug logging.
This method can be called only during an initialization block. All settings are frozen once the wrapper is active.
375 376 377 |
# File 'lib/ractor/wrapper.rb', line 375 def enable_logging=(value) @enable_logging = value ? true : false end |
#enable_logging? ⇒ Boolean
Return whether logging is enabled for this wrapper.
460 461 462 |
# File 'lib/ractor/wrapper.rb', line 460 def enable_logging? @enable_logging end |
#join ⇒ self
Blocks until the wrapper has fully stopped.
555 556 557 558 559 560 561 562 563 564 565 566 567 |
# File 'lib/ractor/wrapper.rb', line 555 def join if @ractor @ractor.join else reply_port = ::Ractor::Port.new @port.send(JoinMessage.new(reply_port)) reply_port.receive reply_port.close end self rescue ::Ractor::ClosedError self end |
#method_settings(method_name) ⇒ MethodSettings
Return the method settings for the given method name. This returns the default method settings if the given method is not configured explicitly by name.
480 481 482 483 |
# File 'lib/ractor/wrapper.rb', line 480 def method_settings(method_name) method_name = method_name.to_sym @method_settings[method_name] || @method_settings[nil] end |
#recover_object ⇒ Object
Retrieves the original object that was wrapped. This should be called only after a stop request has been issued using #async_stop, and may block until the wrapper has fully stopped.
This can be called only if the wrapper was not configured with
use_current_ractor: true. If the wrapper had that configuration, the
object will not be moved, and does not need to be recovered. In such a
case, any calls to this method will raise Ractor::Error.
Only one ractor may call this method; any additional calls will fail with a Ractor::Error.
584 585 586 587 |
# File 'lib/ractor/wrapper.rb', line 584 def recover_object raise ::Ractor::Error, "cannot recover an object from a local wrapper" unless @ractor @ractor.value end |
#use_current_ractor? ⇒ boolean
Determine whether this wrapper runs in the current Ractor
451 452 453 |
# File 'lib/ractor/wrapper.rb', line 451 def use_current_ractor? @ractor.nil? end |