Class: Concurrent::CyclicBarrier

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent/atomic/cyclic_barrier.rb

Instance Method Summary collapse

Constructor Details

#initialize(parties) { ... } ⇒ CyclicBarrier

Create a new CyclicBarrier that waits for parties threads

Parameters:

  • the number of parties

Yields:

  • an optional block that will be executed that will be executed after the last thread arrives and before the others are released

Raises:

  • if parties is not an integer or is less than zero



14
15
16
17
18
19
20
21
22
# File 'lib/concurrent/atomic/cyclic_barrier.rb', line 14

def initialize(parties, &block)
  raise ArgumentError.new('count must be in integer greater than or equal zero') if !parties.is_a?(Fixnum) || parties < 1
  @parties = parties
  @mutex = Mutex.new
  @condition = Condition.new
  @number_waiting = 0
  @action = block
  @generation = Generation.new(:waiting)
end

Instance Method Details

#broken?Boolean

A barrier can be broken when:

  • a thread called the reset method while at least one other thread was waiting

  • at least one thread timed out on wait method

A broken barrier can be restored using reset it’s safer to create a new one

Returns:

  • true if the barrier is broken otherwise false



74
75
76
# File 'lib/concurrent/atomic/cyclic_barrier.rb', line 74

def broken?
  @mutex.synchronize { @generation.status != :waiting }
end

#number_waitingFixnum

Returns the number of threads currently waiting on the barrier.

Returns:

  • the number of threads currently waiting on the barrier



30
31
32
# File 'lib/concurrent/atomic/cyclic_barrier.rb', line 30

def number_waiting
  @number_waiting
end

#partiesFixnum

Returns the number of threads needed to pass the barrier.

Returns:

  • the number of threads needed to pass the barrier



25
26
27
# File 'lib/concurrent/atomic/cyclic_barrier.rb', line 25

def parties
  @parties
end

#resetnil

resets the barrier to its initial state If there is at least one waiting thread, it will be woken up, the wait method will return false and the barrier will be broken If the barrier is broken, this method restores it to the original state

Returns:



62
63
64
65
66
# File 'lib/concurrent/atomic/cyclic_barrier.rb', line 62

def reset
  @mutex.synchronize do
    set_status_and_restore(:reset)
  end
end

#wait(timeout = nil) ⇒ Boolean

Blocks on the barrier until the number of waiting threads is equal to parties or until timeout is reached or reset is called If a block has been passed to the constructor, it will be executed once by the last arrived thread before releasing the others

Parameters:

  • (defaults to: nil)

    the number of seconds to wait for the counter or nil to block indefinitely

Returns:

  • true if the count reaches zero else false on timeout or on reset or if the barrier is broken



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/concurrent/atomic/cyclic_barrier.rb', line 38

def wait(timeout = nil)
  @mutex.synchronize do

    return false unless @generation.status == :waiting

    @number_waiting += 1

    if @number_waiting == @parties
      @action.call if @action
      set_status_and_restore(:fulfilled)
      true
    else
      wait_for_wake_up(@generation, timeout)
    end
  end
end