Class: Async::Barrier
- Inherits:
-
Object
- Object
- Async::Barrier
- Defined in:
- lib/async/barrier.rb
Overview
A general purpose synchronisation primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with Semaphore.
Instance Attribute Summary collapse
-
#tasks ⇒ Object
readonly
All tasks which have been invoked into the barrier.
Instance Method Summary collapse
-
#async(*arguments, parent: (@parent or Task.current), **options, &block) ⇒ Object
Execute a child task and add it to the barrier.
-
#cancel ⇒ Object
Cancel all tasks held by the barrier.
-
#empty? ⇒ Boolean
Whether there are any tasks being held by the barrier.
-
#initialize(parent: nil) ⇒ Barrier
constructor
Initialize the barrier.
-
#size ⇒ Object
Number of tasks being held by the barrier.
-
#stop ⇒ Object
deprecated
Deprecated.
Use #cancel instead.
-
#wait ⇒ Object
Wait for all tasks to complete by invoking Task#wait on each waiting task, which may raise an error.
Constructor Details
Instance Attribute Details
#tasks ⇒ Object (readonly)
All tasks which have been invoked into the barrier.
43 44 45 |
# File 'lib/async/barrier.rb', line 43 def tasks @tasks end |
Instance Method Details
#async(*arguments, parent: (@parent or Task.current), **options, &block) ⇒ Object
Execute a child task and add it to the barrier.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/async/barrier.rb', line 48 def async(*arguments, parent: (@parent or Task.current), **, &block) raise "Barrier is stopped!" if @finished.closed? waiting = nil task = parent.async(*arguments, **) do |task, *arguments| # Create a new list node for the task and add it to the list of waiting tasks: node = TaskNode.new(task) @tasks.append(node) # Signal the outer async block that we have added the task to the list of waiting tasks, and that it can now wait for it to finish: waiting = node @condition.signal # Invoke the block, which may raise an error. If it does, we will still signal that the task has finished: block.call(task, *arguments) ensure # Signal that the task has finished, which will unblock the waiting task: @finished.signal(node) unless @finished.closed? end # `parent.async` may yield before the child block executes, so we wait here until the child has appended itself to `@tasks`, ensuring `wait` cannot return early and miss tracking it: @condition.wait while waiting.nil? return task end |
#cancel ⇒ Object
Cancel all tasks held by the barrier.
118 119 120 121 122 123 124 |
# File 'lib/async/barrier.rb', line 118 def cancel @tasks.each do |waiting| waiting.task.cancel end @finished.close end |
#empty? ⇒ Boolean
Whether there are any tasks being held by the barrier.
77 78 79 |
# File 'lib/async/barrier.rb', line 77 def empty? @tasks.empty? end |
#size ⇒ Object
Number of tasks being held by the barrier.
38 39 40 |
# File 'lib/async/barrier.rb', line 38 def size @tasks.size end |
#stop ⇒ Object
128 129 130 |
# File 'lib/async/barrier.rb', line 128 def stop cancel end |
#wait ⇒ Object
Wait for all tasks to complete by invoking Task#wait on each waiting task, which may raise an error. As long as the task has completed, it will be removed from the barrier.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/async/barrier.rb', line 87 def wait return nil if @tasks.empty? count = 0 while true # Wait for a task to finish (we get the task node): break unless waiting = @finished.wait count += 1 # Remove the task as it is now finishing: @tasks.remove?(waiting) # Get the task: task = waiting.task # If a block is given, the user can implement their own behaviour: if block_given? yield task else # Wait for it to either complete or raise an error: task.wait end break if @tasks.empty? end return count end |