Module: Concurrent::Async

Defined in:
lib/concurrent/async.rb

Overview

Note:

This module depends on several internal synchronization objects that must be initialized prior to calling any of the async/await/executor methods. The best practice is to call ‘init_mutex` from within the constructor of the including class. A less ideal but acceptable practice is for the thread creating the asynchronous object to explicitly call the `init_mutex` method prior to calling any of the async/await/executor methods. If `init_mutex` is not called explicitly the async/await/executor methods will raize a `Concurrent::InitializationError`. This is the only way thread-safe initialization can be guaranteed.

Note:

Thread safe guarantees can only be made when asynchronous method calls are not mixed with synchronous method calls. Use only synchronous calls when the object is used exclusively on a single thread. Use only ‘async` and `await` when the object is shared between threads. Once you call a method using `async`, you should no longer call any methods directly on the object. Use `async` and `await` exclusively from then on. With careful programming it is possible to switch back and forth but it’s also very easy to create race conditions and break your application. Basically, it’s “async all the way down.”

A mixin module that provides simple asynchronous behavior to any standard class/object or object.

Scenario:
  As a stateful, plain old Ruby class/object
  I want safe, asynchronous behavior
  So my long-running methods don't block the main thread

Stateful, mutable objects must be managed carefully when used asynchronously. But Ruby is an object-oriented language so designing with objects and classes plays to Ruby’s strengths and is often more natural to many Ruby programmers. The ‘Async` module is a way to mix simple yet powerful asynchronous capabilities into any plain old Ruby object or class. These capabilities provide a reasonable level of thread safe guarantees when used correctly.

When this module is mixed into a class or object it provides to new methods: ‘async` and `await`. These methods are thread safe with respect to the enclosing object. The former method allows methods to be called asynchronously by posting to the global thread pool. The latter allows a method to be called synchronously on the current thread but does so safely with respect to any pending asynchronous method calls. Both methods return an `Obligation` which can be inspected for the result of the method call. Calling a method with `async` will return a `:pending` `Obligation` whereas `await` will return a `:complete` `Obligation`.

Very loosely based on the ‘async` and `await` keywords in C#.

Examples:

Defining an asynchronous class

class Echo
  include Concurrent::Async

  def initialize
    init_mutex # initialize the internal synchronization objects
  end

  def echo(msg)
    sleep(rand)
    print "#{msg}\n"
    nil
  end
end

horn = Echo.new
horn.echo('zero') # synchronous, not thread-safe

horn.async.echo('one') # asynchronous, non-blocking, thread-safe
horn.await.echo('two') # synchronous, blocking, thread-safe

Monkey-patching an existing object

numbers = 1_000_000.times.collect{ rand }
numbers.extend(Concurrent::Async)
numbers.init_mutex # initialize the internal synchronization objects

future = numbers.async.max
future.state #=> :pending

sleep(2)

future.state #=> :fulfilled
future.value #=> 0.999999138918843

See Also:

Since:

  • 0.6.0

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.validate_argc(obj, method, *args) ⇒ Object

Note:

This check is imperfect because of the way Ruby reports the arity of methods with a variable number of arguments. It is possible to determine if too few arguments are given but impossible to determine if too many arguments are given. This check may also fail to recognize dynamic behavior of the object, such as methods simulated with ‘method_missing`.

Check for the presence of a method on an object and determine if a given set of arguments matches the required arity.

Parameters:

  • obj (Object)

    the object to check against

  • method (Symbol)

    the method to check the object for

  • args (Array)

    zero or more arguments for the arity check

Raises:

  • (NameError)

    the object does not respond to ‘method` method

  • (ArgumentError)

    the given ‘args` do not match the arity of `method`

See Also:

Since:

  • 0.6.0



115
116
117
118
119
120
121
122
123
124
# File 'lib/concurrent/async.rb', line 115

def validate_argc(obj, method, *args)
  argc = args.length
  arity = obj.method(method).arity

  if arity >= 0 && argc != arity
    raise ArgumentError.new("wrong number of arguments (#{argc} for #{arity})")
  elsif arity < 0 && (arity = (arity + 1).abs) > argc
    raise ArgumentError.new("wrong number of arguments (#{argc} for #{arity}..*)")
  end
end

Instance Method Details

#asyncConcurrent::Future Also known as: future

Note:

The method call is guaranteed to be thread safe with respect to all other method calls against the same object that are called with either ‘async` or `await`. The mutable nature of Ruby references (and object orientation in general) prevent any other thread safety guarantees. Do NOT mix non-protected method calls with protected method call. Use only protected method calls when sharing the object between threads.

Causes the chained method call to be performed asynchronously on the global thread pool. The method called by this method will return a ‘Future` object in the `:pending` state and the method call will have been scheduled on the global thread pool. The final disposition of the method call can be obtained by inspecting the returned `Future`.

Before scheduling the method on the global thread pool a best-effort attempt will be made to validate that the method exists on the object and that the given arguments match the arity of the requested function. Due to the dynamic nature of Ruby and limitations of its reflection library, some edge cases will be missed. For more information see the documentation for the ‘validate_argc` method.

Returns:

Raises:

  • (Concurrent::InitializationError)

    ‘#init_mutex` has not been called

  • (NameError)

    the object does not respond to ‘method` method

  • (ArgumentError)

    the given ‘args` do not match the arity of `method`

See Also:

Since:

  • 0.6.0



248
249
250
251
# File 'lib/concurrent/async.rb', line 248

def async
  raise InitializationError.new('#init_mutex was never called') unless @__async__mutex__
  @__async_delegator__.value
end

#awaitConcurrent::IVar Also known as: delay

Note:

The method call is guaranteed to be thread safe with respect to all other method calls against the same object that are called with either ‘async` or `await`. The mutable nature of Ruby references (and object orientation in general) prevent any other thread safety guarantees. Do NOT mix non-protected method calls with protected method call. Use only protected method calls when sharing the object between threads.

Causes the chained method call to be performed synchronously on the current thread. The method called by this method will return an ‘IVar` object in either the `:fulfilled` or `rejected` state and the method call will have completed. The final disposition of the method call can be obtained by inspecting the returned `IVar`.

Before scheduling the method on the global thread pool a best-effort attempt will be made to validate that the method exists on the object and that the given arguments match the arity of the requested function. Due to the dynamic nature of Ruby and limitations of its reflection library, some edge cases will be missed. For more information see the documentation for the ‘validate_argc` method.

Returns:

Raises:

  • (Concurrent::InitializationError)

    ‘#init_mutex` has not been called

  • (NameError)

    the object does not respond to ‘method` method

  • (ArgumentError)

    the given ‘args` do not match the arity of `method`

See Also:

Since:

  • 0.6.0



282
283
284
285
# File 'lib/concurrent/async.rb', line 282

def await
  raise InitializationError.new('#init_mutex was never called') unless @__async__mutex__
  @__await_delegator__.value
end

#executor=(executor) ⇒ Object

Set a new executor

Raises:

Since:

  • 0.6.0



292
293
294
295
296
# File 'lib/concurrent/async.rb', line 292

def executor=(executor)
  raise InitializationError.new('#init_mutex was never called') unless @__async__mutex__
  @__async__executor__.reconfigure { executor } or
    raise ArgumentError.new('executor has already been set')
end

#init_mutexObject

Note:

This method must be called from the constructor of the including class or explicitly by the caller prior to calling any other methods. This is the only way thread-safe initialization can be guaranteed.

Initialize the internal mutex and other synchronization objects. This method must be called from the constructor of the including class or explicitly by the caller prior to calling any other methods. If ‘init_mutex` is not called explicitly the async/await/executor methods will raize a `Concurrent::InitializationError`. This is the only way thread-safe initialization can be guaranteed.

Raises:

Since:

  • 0.6.0



310
311
312
313
314
315
316
317
# File 'lib/concurrent/async.rb', line 310

def init_mutex
  raise InitializationError.new('#init_mutex was already called') if @__async__mutex__
  (@__async__mutex__ = Mutex.new).lock
  @__async__executor__ = Delay.new{ Concurrent.configuration.global_operation_pool }
  @__await_delegator__ = Delay.new{ AwaitDelegator.new(self, @__async__mutex__) }
  @__async_delegator__ = Delay.new{ AsyncDelegator.new(self, @__async__executor__, @__async__mutex__) }
  @__async__mutex__.unlock
end