Module: Concurrent::Async
- Defined in:
- lib/concurrent/async.rb
Overview
This module depends on several internal synchronization objects that must be initialized prior to calling any of the async/await/executor methods. The best practice is to call ‘init_mutex` from within the constructor of the including class. A less ideal but acceptable practice is for the thread creating the asynchronous object to explicitly call the `init_mutex` method prior to calling any of the async/await/executor methods. If `init_mutex` is not called explicitly the async/await/executor methods will raize a `Concurrent::InitializationError`. This is the only way thread-safe initialization can be guaranteed.
Thread safe guarantees can only be made when asynchronous method calls are not mixed with synchronous method calls. Use only synchronous calls when the object is used exclusively on a single thread. Use only ‘async` and `await` when the object is shared between threads. Once you call a method using `async`, you should no longer call any methods directly on the object. Use `async` and `await` exclusively from then on. With careful programming it is possible to switch back and forth but it’s also very easy to create race conditions and break your application. Basically, it’s “async all the way down.”
A mixin module that provides simple asynchronous behavior to any standard class/object or object.
Scenario:
As a stateful, plain old Ruby class/object
I want safe, asynchronous behavior
So my long-running methods don't block the main thread
Stateful, mutable objects must be managed carefully when used asynchronously. But Ruby is an object-oriented language so designing with objects and classes plays to Ruby’s strengths and is often more natural to many Ruby programmers. The ‘Async` module is a way to mix simple yet powerful asynchronous capabilities into any plain old Ruby object or class. These capabilities provide a reasonable level of thread safe guarantees when used correctly.
When this module is mixed into a class or object it provides to new methods: ‘async` and `await`. These methods are thread safe with respect to the enclosing object. The former method allows methods to be called asynchronously by posting to the global thread pool. The latter allows a method to be called synchronously on the current thread but does so safely with respect to any pending asynchronous method calls. Both methods return an `Obligation` which can be inspected for the result of the method call. Calling a method with `async` will return a `:pending` `Obligation` whereas `await` will return a `:complete` `Obligation`.
Very loosely based on the ‘async` and `await` keywords in C#.
Class Method Summary collapse
-
.validate_argc(obj, method, *args) ⇒ Object
Check for the presence of a method on an object and determine if a given set of arguments matches the required arity.
Instance Method Summary collapse
-
#async ⇒ Concurrent::Future
(also: #future)
Causes the chained method call to be performed asynchronously on the global thread pool.
-
#await ⇒ Concurrent::IVar
(also: #delay)
Causes the chained method call to be performed synchronously on the current thread.
-
#executor=(executor) ⇒ Object
Set a new executor.
-
#init_mutex ⇒ Object
Initialize the internal mutex and other synchronization objects.
Class Method Details
.validate_argc(obj, method, *args) ⇒ Object
This check is imperfect because of the way Ruby reports the arity of methods with a variable number of arguments. It is possible to determine if too few arguments are given but impossible to determine if too many arguments are given. This check may also fail to recognize dynamic behavior of the object, such as methods simulated with ‘method_missing`.
Check for the presence of a method on an object and determine if a given set of arguments matches the required arity.
115 116 117 118 119 120 121 122 123 124 |
# File 'lib/concurrent/async.rb', line 115 def validate_argc(obj, method, *args) argc = args.length arity = obj.method(method).arity if arity >= 0 && argc != arity raise ArgumentError.new("wrong number of arguments (#{argc} for #{arity})") elsif arity < 0 && (arity = (arity + 1).abs) > argc raise ArgumentError.new("wrong number of arguments (#{argc} for #{arity}..*)") end end |
Instance Method Details
#async ⇒ Concurrent::Future Also known as: future
The method call is guaranteed to be thread safe with respect to all other method calls against the same object that are called with either ‘async` or `await`. The mutable nature of Ruby references (and object orientation in general) prevent any other thread safety guarantees. Do NOT mix non-protected method calls with protected method call. Use only protected method calls when sharing the object between threads.
Causes the chained method call to be performed asynchronously on the global thread pool. The method called by this method will return a ‘Future` object in the `:pending` state and the method call will have been scheduled on the global thread pool. The final disposition of the method call can be obtained by inspecting the returned `Future`.
Before scheduling the method on the global thread pool a best-effort attempt will be made to validate that the method exists on the object and that the given arguments match the arity of the requested function. Due to the dynamic nature of Ruby and limitations of its reflection library, some edge cases will be missed. For more information see the documentation for the ‘validate_argc` method.
248 249 250 251 |
# File 'lib/concurrent/async.rb', line 248 def async raise InitializationError.new('#init_mutex was never called') unless @__async__mutex__ @__async_delegator__.value end |
#await ⇒ Concurrent::IVar Also known as: delay
The method call is guaranteed to be thread safe with respect to all other method calls against the same object that are called with either ‘async` or `await`. The mutable nature of Ruby references (and object orientation in general) prevent any other thread safety guarantees. Do NOT mix non-protected method calls with protected method call. Use only protected method calls when sharing the object between threads.
Causes the chained method call to be performed synchronously on the current thread. The method called by this method will return an ‘IVar` object in either the `:fulfilled` or `rejected` state and the method call will have completed. The final disposition of the method call can be obtained by inspecting the returned `IVar`.
Before scheduling the method on the global thread pool a best-effort attempt will be made to validate that the method exists on the object and that the given arguments match the arity of the requested function. Due to the dynamic nature of Ruby and limitations of its reflection library, some edge cases will be missed. For more information see the documentation for the ‘validate_argc` method.
282 283 284 285 |
# File 'lib/concurrent/async.rb', line 282 def await raise InitializationError.new('#init_mutex was never called') unless @__async__mutex__ @__await_delegator__.value end |
#executor=(executor) ⇒ Object
Set a new executor
292 293 294 295 296 |
# File 'lib/concurrent/async.rb', line 292 def executor=(executor) raise InitializationError.new('#init_mutex was never called') unless @__async__mutex__ @__async__executor__.reconfigure { executor } or raise ArgumentError.new('executor has already been set') end |
#init_mutex ⇒ Object
This method must be called from the constructor of the including class or explicitly by the caller prior to calling any other methods. This is the only way thread-safe initialization can be guaranteed.
Initialize the internal mutex and other synchronization objects. This method must be called from the constructor of the including class or explicitly by the caller prior to calling any other methods. If ‘init_mutex` is not called explicitly the async/await/executor methods will raize a `Concurrent::InitializationError`. This is the only way thread-safe initialization can be guaranteed.
310 311 312 313 314 315 316 317 |
# File 'lib/concurrent/async.rb', line 310 def init_mutex raise InitializationError.new('#init_mutex was already called') if @__async__mutex__ (@__async__mutex__ = Mutex.new).lock @__async__executor__ = Delay.new{ Concurrent.configuration.global_operation_pool } @__await_delegator__ = Delay.new{ AwaitDelegator.new(self, @__async__mutex__) } @__async_delegator__ = Delay.new{ AsyncDelegator.new(self, @__async__executor__, @__async__mutex__) } @__async__mutex__.unlock end |