Class: Async::Barrier

Inherits:
Object
  • Object
show all
Defined in:
lib/async/barrier.rb

Overview

A general purpose synchronisation primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with Semaphore.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(parent: nil) ⇒ Barrier

Initialize the barrier.



19
20
21
22
23
24
25
# File 'lib/async/barrier.rb', line 19

def initialize(parent: nil)
	@tasks = List.new
	@finished = Queue.new
	@condition = Condition.new
	
	@parent = parent
end

Instance Attribute Details

#tasksObject (readonly)

All tasks which have been invoked into the barrier.



43
44
45
# File 'lib/async/barrier.rb', line 43

def tasks
  @tasks
end

Instance Method Details

#async(*arguments, parent: (@parent or Task.current), **options, &block) ⇒ Object

Execute a child task and add it to the barrier.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/async/barrier.rb', line 48

def async(*arguments, parent: (@parent or Task.current), **options, &block)
	raise "Barrier is stopped!" if @finished.closed?
	
	waiting = nil
	
	task = parent.async(*arguments, **options) do |task, *arguments|
		# Create a new list node for the task and add it to the list of waiting tasks:
		node = TaskNode.new(task)
		@tasks.append(node)
		
		# Signal the outer async block that we have added the task to the list of waiting tasks, and that it can now wait for it to finish:
		waiting = node
		@condition.signal
		
		# Invoke the block, which may raise an error. If it does, we will still signal that the task has finished:
		block.call(task, *arguments)
	ensure
		# Signal that the task has finished, which will unblock the waiting task:
		@finished.signal(node) unless @finished.closed?
	end
	
	# `parent.async` may yield before the child block executes, so we wait here until the child has appended itself to `@tasks`, ensuring `wait` cannot return early and miss tracking it:
	@condition.wait while waiting.nil?
	
	return task
end

#cancelObject

Cancel all tasks held by the barrier.



118
119
120
121
122
123
124
# File 'lib/async/barrier.rb', line 118

def cancel
	@tasks.each do |waiting|
		waiting.task.cancel
	end
	
	@finished.close
end

#empty?Boolean

Whether there are any tasks being held by the barrier.

Returns:

  • (Boolean)


77
78
79
# File 'lib/async/barrier.rb', line 77

def empty?
	@tasks.empty?
end

#sizeObject

Number of tasks being held by the barrier.



38
39
40
# File 'lib/async/barrier.rb', line 38

def size
	@tasks.size
end

#stopObject

Deprecated.

Use #cancel instead.

Backward compatibility alias for #cancel.



128
129
130
# File 'lib/async/barrier.rb', line 128

def stop
	cancel
end

#waitObject

Wait for all tasks to complete by invoking Task#wait on each waiting task, which may raise an error. As long as the task has completed, it will be removed from the barrier.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/async/barrier.rb', line 87

def wait
	return nil if @tasks.empty?
	count = 0
	
	while true
		# Wait for a task to finish (we get the task node):
		break unless waiting = @finished.wait
		count += 1
		
		# Remove the task as it is now finishing:
		@tasks.remove?(waiting)
		
		# Get the task:
		task = waiting.task
		
		# If a block is given, the user can implement their own behaviour:
		if block_given?
			yield task
		else
			# Wait for it to either complete or raise an error:
			task.wait
		end
		
		break if @tasks.empty?
	end
	
	return count
end