Module: Concurrent

Extended by:
Logging
Defined in:
lib/concurrent.rb,
lib/concurrent/ivar.rb,
lib/concurrent/mvar.rb,
lib/concurrent/tvar.rb,
lib/concurrent/agent.rb,
lib/concurrent/async.rb,
lib/concurrent/delay.rb,
lib/concurrent/errors.rb,
lib/concurrent/future.rb,
lib/concurrent/actress.rb,
lib/concurrent/logging.rb,
lib/concurrent/promise.rb,
lib/concurrent/version.rb,
lib/concurrent/dataflow.rb,
lib/concurrent/runnable.rb,
lib/concurrent/exchanger.rb,
lib/concurrent/stoppable.rb,
lib/concurrent/obligation.rb,
lib/concurrent/observable.rb,
lib/concurrent/supervisor.rb,
lib/concurrent/timer_task.rb,
lib/concurrent/actor/actor.rb,
lib/concurrent/actress/core.rb,
lib/concurrent/atomic/event.rb,
lib/concurrent/atomic/atomic.rb,
lib/concurrent/configuration.rb,
lib/concurrent/utility/timer.rb,
lib/concurrent/actor/postable.rb,
lib/concurrent/actress/ad_hoc.rb,
lib/concurrent/actress/errors.rb,
lib/concurrent/options_parser.rb,
lib/concurrent/scheduled_task.rb,
lib/concurrent/actress/context.rb,
lib/concurrent/channel/channel.rb,
lib/concurrent/dereferenceable.rb,
lib/concurrent/utility/timeout.rb,
lib/concurrent/actress/envelope.rb,
lib/concurrent/atomic/condition.rb,
lib/concurrent/actress/reference.rb,
lib/concurrent/executor/executor.rb,
lib/concurrent/actress/type_check.rb,
lib/concurrent/executor/timer_set.rb,
lib/concurrent/executor/one_by_one.rb,
lib/concurrent/atomic/atomic_fixnum.rb,
lib/concurrent/atomic/atomic_boolean.rb,
lib/concurrent/atomic/cyclic_barrier.rb,
lib/concurrent/channel/waitable_list.rb,
lib/concurrent/collection/ring_buffer.rb,
lib/concurrent/atomic/count_down_latch.rb,
lib/concurrent/atomic/thread_local_var.rb,
lib/concurrent/utility/processor_count.rb,
lib/concurrent/actress/core_delegations.rb,
lib/concurrent/channel/buffered_channel.rb,
lib/concurrent/collection/priority_queue.rb,
lib/concurrent/channel/unbuffered_channel.rb,
lib/concurrent/executor/fixed_thread_pool.rb,
lib/concurrent/executor/cached_thread_pool.rb,
lib/concurrent/executor/immediate_executor.rb,
lib/concurrent/executor/safe_task_executor.rb,
lib/concurrent/executor/per_thread_executor.rb,
lib/concurrent/executor/thread_pool_executor.rb,
lib/concurrent/collection/blocking_ring_buffer.rb,
lib/concurrent/executor/java_fixed_thread_pool.rb,
lib/concurrent/executor/ruby_fixed_thread_pool.rb,
lib/concurrent/executor/single_thread_executor.rb,
lib/concurrent/executor/java_cached_thread_pool.rb,
lib/concurrent/executor/ruby_cached_thread_pool.rb,
lib/concurrent/executor/ruby_thread_pool_worker.rb,
lib/concurrent/atomic/copy_on_write_observer_set.rb,
lib/concurrent/atomic/copy_on_notify_observer_set.rb,
lib/concurrent/executor/java_thread_pool_executor.rb,
lib/concurrent/executor/ruby_thread_pool_executor.rb,
lib/concurrent/executor/java_single_thread_executor.rb,
lib/concurrent/executor/ruby_single_thread_executor.rb

Overview

Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns.

The design goals of this gem are:

  • Stay true to the spirit of the languages providing inspiration

  • But implement in a way that makes sense for Ruby

  • Keep the semantics as idiomatic Ruby as possible

  • Support features that make sense in Ruby

  • Exclude features that don’t make sense in Ruby

  • Be small, lean, and loosely coupled

Defined Under Namespace

Modules: Actress, Async, Channel, Dereferenceable, Executor, JavaExecutor, Logging, Obligation, Observable, OptionsParser, Postable, RubyExecutor, Runnable, Stoppable, ThreadLocalJavaStorage, ThreadLocalNewStorage, ThreadLocalOldStorage, ThreadLocalSymbolAllocator Classes: Actor, Agent, Atomic, AtomicBoolean, AtomicFixnum, BlockingRingBuffer, BufferedChannel, CachedThreadPool, Condition, Configuration, CopyOnNotifyObserverSet, CopyOnWriteObserverSet, CountDownLatch, CyclicBarrier, Delay, Event, Exchanger, FixedThreadPool, Future, IVar, ImmediateExecutor, JavaAtomicBoolean, JavaAtomicFixnum, JavaCachedThreadPool, JavaCountDownLatch, JavaFixedThreadPool, JavaPriorityQueue, JavaSingleThreadExecutor, JavaThreadPoolExecutor, MVar, MutexAtomic, MutexAtomicBoolean, MutexAtomicFixnum, MutexCountDownLatch, MutexPriorityQueue, OneByOne, PerThreadExecutor, PriorityQueue, ProcessorCounter, Promise, RingBuffer, RubyCachedThreadPool, RubyFixedThreadPool, RubySingleThreadExecutor, RubyThreadPoolExecutor, SafeTaskExecutor, ScheduledTask, SingleThreadExecutor, Supervisor, TVar, ThreadLocalVar, ThreadPoolExecutor, TimerSet, TimerTask, Transaction, UnbufferedChannel, WaitableList

Constant Summary collapse

ConfigurationError =

Raised when errors occur during configuration.

Class.new(StandardError)
LifecycleError =

Raised when a lifecycle method (such as ‘stop`) is called in an improper sequence or when the object is in an inappropriate state.

Class.new(StandardError)
InitializationError =

Raised when an object’s methods are called when it has not been properly initialized.

Class.new(StandardError)
MaxRestartFrequencyError =

Raised when an object with a start/stop lifecycle has been started an excessive number of times. Often used in conjunction with a restart policy or strategy.

Class.new(StandardError)
MultipleAssignmentError =

Raised when an attempt is made to modify an immutable object (such as an ‘IVar`) after its final state has been set.

Class.new(StandardError)
RejectedExecutionError =

Raised by an ‘Executor` when it is unable to process a given task, possibly because of a reject policy or other internal error.

Class.new(StandardError)
TimeoutError =

Raised when an operation times out.

Class.new(StandardError)
VERSION =
'0.6.0'

Class Method Summary collapse

Methods included from Logging

log

Class Method Details

.abort_transactionObject

Abort a currently running transaction - see ‘Concurrent::atomically`.



140
141
142
# File 'lib/concurrent/tvar.rb', line 140

def abort_transaction
  raise Transaction::AbortError.new
end

.atomicallyObject

Run a block that reads and writes ‘TVar`s as a single atomic transaction. With respect to the value of `TVar` objects, the transaction is atomic, in that it either happens or it does not, consistent, in that the `TVar` objects involved will never enter an illegal state, and isolated, in that transactions never interfere with each other. You may recognise these properties from database transactions.

There are some very important and unusual semantics that you must be aware of:

  • Most importantly, the block that you pass to atomically may be executed more than once. In most cases your code should be free of side-effects, except for via TVar.

  • If an exception escapes an atomically block it will abort the transaction.

  • It is undefined behaviour to use callcc or Fiber with atomically.

  • If you create a new thread within an atomically, it will not be part of the transaction. Creating a thread counts as a side-effect.

Transactions within transactions are flattened to a single transaction.

Examples:

a = new TVar(100_000)
b = new TVar(100)

Concurrent::atomically do
  a.value -= 10
  b.value += 10
end

Raises:

  • (ArgumentError)


86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/concurrent/tvar.rb', line 86

def atomically
  raise ArgumentError.new('no block given') unless block_given?

  # Get the current transaction

  transaction = Transaction::current

  # Are we not already in a transaction (not nested)?

  if transaction.nil?
    # New transaction

    begin
      # Retry loop
      
      loop do

        # Create a new transaction

        transaction = Transaction.new
        Transaction::current = transaction

        # Run the block, aborting on exceptions

        begin
          result = yield
        rescue Transaction::AbortError => e
          transaction.abort
          result = Transaction::ABORTED
        rescue => e
          transaction.abort
          throw e
        end
        # If we can commit, break out of the loop

        if result != Transaction::ABORTED
          if transaction.commit
            break result
          end
        end
      end
    ensure
      # Clear the current transaction

      Transaction::current = nil
    end
  else
    # Nested transaction - flatten it and just run the block

    yield
  end
end

.configure {|the| ... } ⇒ Object

Perform gem-level configuration.

Yields:

  • the configuration commands

Yield Parameters:



121
122
123
# File 'lib/concurrent/configuration.rb', line 121

def self.configure
  yield(configuration)
end

.dataflow(*inputs) {|inputs| ... } ⇒ Object

Dataflow allows you to create a task that will be scheduled then all of its data dependencies are available. Data dependencies are ‘Future` values. The dataflow task itself is also a `Future` value, so you can build up a graph of these tasks, each of which is run when all the data and other tasks it depends on are available or completed.

Our syntax is somewhat related to that of Akka’s ‘flow` and Habanero Java’s ‘DataDrivenFuture`. However unlike Akka we don’t schedule a task at all until it is ready to run, and unlike Habanero Java we pass the data values into the task instead of dereferencing them again in the task.

The theory of dataflow goes back to the 80s. In the terminology of the literature, our implementation is coarse-grained, in that each task can be many instructions, and dynamic in that you can create more tasks within other tasks.

Examples:

Parallel Fibonacci calculator

def fib(n)
  if n < 2
    Concurrent::dataflow { n }
  else
    n1 = fib(n - 1)
    n2 = fib(n - 2)
    Concurrent::dataflow(n1, n2) { |v1, v2| v1 + v2 }
  end
end

f = fib(14) #=> #<Concurrent::Future:0x000001019a26d8 ...

# wait up to 1 second for the answer...
f.value(1) #=> 377

Parameters:

  • inputs (Future)

    zero or more ‘Future` operations that this dataflow depends upon

Yields:

  • The operation to perform once all the dependencies are met

Yield Parameters:

  • inputs (Future)

    each of the ‘Future` inputs to the dataflow

Yield Returns:

  • (Object)

    the result of the block operation

Returns:

  • (Object)

    the result of all the operations

Raises:

  • (ArgumentError)

    if no block is given

  • (ArgumentError)

    if any of the inputs are not ‘IVar`s



63
64
65
# File 'lib/concurrent/dataflow.rb', line 63

def dataflow(*inputs, &block)
  dataflow_with(Concurrent.configuration.global_task_pool, *inputs, &block)
end

.dataflow_with(executor, *inputs, &block) ⇒ Object

Raises:

  • (ArgumentError)


68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/concurrent/dataflow.rb', line 68

def dataflow_with(executor, *inputs, &block)
  raise ArgumentError.new('an executor must be provided') if executor.nil?
  raise ArgumentError.new('no block given') unless block_given?
  raise ArgumentError.new('not all dependencies are IVars') unless inputs.all? { |input| input.is_a? IVar }

  result = Future.new(executor: executor) do
    values = inputs.map { |input| input.value }
    block.call(*values)
  end

  if inputs.empty?
    result.execute
  else
    counter = DependencyCounter.new(inputs.size) { result.execute }

    inputs.each do |input|
      input.add_observer counter
    end
  end

  result
end

.physical_processor_countObject



146
147
148
# File 'lib/concurrent/utility/processor_count.rb', line 146

def self.physical_processor_count
  processor_counter.physical_processor_count
end

.processor_countObject



142
143
144
# File 'lib/concurrent/utility/processor_count.rb', line 142

def self.processor_count
  processor_counter.processor_count
end

.timeout(seconds) ⇒ Object

Note:

This method is intended to be a simpler and more reliable replacement

Wait the given number of seconds for the block operation to complete.

to the Ruby standard library ‘Timeout::timeout` method.

Parameters:

  • seconds (Integer)

    The number of seconds to wait

Returns:

  • (Object)

    The result of the block operation

Raises:



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/concurrent/utility/timeout.rb', line 19

def timeout(seconds)

  thread = Thread.new do
    Thread.current[:result] = yield
  end
  success = thread.join(seconds)

  if success
    return thread[:result]
  else
    raise TimeoutError
  end
ensure
  Thread.kill(thread) unless thread.nil?
end

.timer(seconds, *args) { ... } ⇒ Boolean

Perform the given operation asynchronously after the given number of seconds.

Parameters:

  • seconds (Fixnum)

    the interval in seconds to wait before executing the task

Yields:

  • the task to execute

Returns:

  • (Boolean)

    true

Raises:

  • (ArgumentError)


13
14
15
16
17
18
19
# File 'lib/concurrent/utility/timer.rb', line 13

def timer(seconds, *args, &block)
  raise ArgumentError.new('no block given') unless block_given?
  raise ArgumentError.new('interval must be greater than or equal to zero') if seconds < 0

  Concurrent.configuration.global_timer_set.post(seconds, *args, &block)
  true
end