Class: Concurrent::Channel::Buffer::Unbuffered

Inherits:
Base
  • Object
show all
Defined in:
lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb

Overview

A blocking buffer with a size of zero. An item can only be put onto the buffer when a thread is waiting to take. Similarly, an item can only be put onto the buffer when a thread is waiting to put. When either #put or #take is called and there is no corresponding call in progress, the call will block indefinitely. Any other calls to the same method will queue behind the first call and block as well. As soon as a corresponding put/take call is made an exchange will occur and the first blocked call will return.

Instance Attribute Summary

Attributes inherited from Base

#capacity

Instance Method Summary collapse

Methods inherited from Base

#blocking?, #close, #closed?, #initialize

Constructor Details

This class inherits a constructor from Concurrent::Channel::Buffer::Base

Instance Method Details

#empty?Boolean

Predicate indicating if the buffer is empty.

Returns:

  • (Boolean)

    true if this buffer is empty else false



27
28
29
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 27

def empty?
  size == 0
end

#full?Boolean

Predicate indicating if the buffer is full.

Returns:

  • (Boolean)

    true if this buffer is full else false



32
33
34
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 32

def full?
  !empty?
end

#nextObject, Boolean

Take the next “item” from the buffer and also return a boolean indicating if “more” items can be taken. Used for iterating over a buffer until it is closed and empty.

If the buffer is open but no items remain the calling thread will block until an item is available. The second of the two return values, “more” (a boolean), will always be ‘true` when the buffer is open. The “more” value will be `false` when the channel has been closed and all values have already been received. When “more” is false the returned item will be `Concurrent::NULL`.

Note that when multiple threads access the same channel a race condition can occur when using this method. A call to ‘next` from one thread may return `true` for the second return value, but another thread may `take` the last value before the original thread makes another call. Code which iterates over a channel must be programmed to properly handle these race conditions.

Items can only be taken from the buffer when one or more threads are waiting to #put items onto the buffer. This method exhibits the same blocking behavior as #take.

Returns:

  • (Object, Boolean)

    the first return value will be the item taken from the buffer and the second return value will be a boolean indicating whether or not more items remain.

See Also:



135
136
137
138
139
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 135

def next
  item = take
  more = (item != Concurrent::NULL)
  return item, more
end

#offer(item) ⇒ Boolean

Put an item onto the buffer if possible. If the buffer is open but unable to add an item, probably due to being full, the method will return immediately. Similarly, the method will return immediately when the buffer is closed. A return value of ‘false` does not necessarily indicate that the buffer is closed, just that the item could not be added.

Items can only be put onto the buffer when one or more threads are waiting to #take items off the buffer. When there is a thread waiting to take an item this method will give its item and return ‘true` immediately. When there are no threads waiting to take or the buffer is closed, this method will return `false` immediately.

Parameters:

  • item (Object)

    the item/value to put onto the buffer.

Returns:

  • (Boolean)

    true if the item was added to the buffer else false (always false when closed).



71
72
73
74
75
76
77
78
79
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 71

def offer(item)
  synchronize do
    return false if ns_closed? || taking.empty?

    taken = taking.shift
    taken.value = item
    true
  end
end

#pollObject

Take the next item from the buffer if one is available else return immediately. Failing to return a value does not necessarily indicate that the buffer is closed, just that it is empty.

Items can only be taken off the buffer when one or more threads are waiting to #put items onto the buffer. When there is a thread waiting to put an item this method will take the item and return it immediately. When there are no threads waiting to put or the buffer is closed, this method will return ‘Concurrent::NULL` immediately.

Returns:

  • (Object)

    the next item from the buffer or ‘Concurrent::NULL` if the buffer is empty.



117
118
119
120
121
122
123
124
125
126
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 117

def poll
  synchronize do
    return Concurrent::NULL if putting.empty?

    put = putting.shift
    value = put.value
    put.value = nil
    value
  end
end

#put(item) ⇒ Boolean

Put an item onto the buffer if possible. If the buffer is open but not able to accept the item the calling thread will block until the item can be put onto the buffer.

Items can only be put onto the buffer when one or more threads are waiting to #take items off the buffer. When there is a thread waiting to take an item this method will give its item and return immediately. When there are no threads waiting to take, this method will block. As soon as a thread calls ‘take` the exchange will occur and this method will return.

Parameters:

  • item (Object)

    the item/value to put onto the buffer.

Returns:

  • (Boolean)

    true if the item was added to the buffer else false (always false when closed).



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 44

def put(item)
  mine = synchronize do
    return false if ns_closed?

    ref = Concurrent::AtomicReference.new(item)
    if taking.empty?
      putting.push(ref)
    else
      taken = taking.shift
      taken.value = item
      ref.value = nil
    end
    ref
  end
  loop do
    return true if mine.value.nil?
    Thread.pass
  end
end

#sizeObject

The number of items currently in the buffer.



20
21
22
23
24
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 20

def size
  synchronize do
    putting.empty? ? 0 : 1
  end
end

#takeObject

Take an item from the buffer if one is available. If the buffer is open and no item is available the calling thread will block until an item is available. If the buffer is closed but items are available the remaining items can still be taken. Once the buffer closes, no remaining items can be taken.

Items can only be taken from the buffer when one or more threads are waiting to #put items onto the buffer. When there is a thread waiting to put an item this method will take that item and return it immediately. When there are no threads waiting to put, this method will block. As soon as a thread calls ‘pur` the exchange will occur and this method will return.

Returns:

  • (Object)

    the item removed from the buffer; ‘Concurrent::NULL` once the buffer has closed.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 89

def take
  mine = synchronize do
    return Concurrent::NULL if ns_closed? && putting.empty?

    ref = Concurrent::AtomicReference.new(nil)
    if putting.empty?
      taking.push(ref)
    else
      put = putting.shift
      ref.value = put.value
      put.value = nil
    end
    ref
  end
  loop do
    item = mine.value
    return item if item
    Thread.pass
  end
end