Class: Concurrent::Channel::Buffer::Unbuffered
- Inherits:
-
Base
- Object
- Synchronization::LockableObject
- Base
- Concurrent::Channel::Buffer::Unbuffered
- Defined in:
- lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb
Overview
A blocking buffer with a size of zero. An item can only be put onto the buffer when a thread is waiting to take. Similarly, an item can only be put onto the buffer when a thread is waiting to put. When either #put or #take is called and there is no corresponding call in progress, the call will block indefinitely. Any other calls to the same method will queue behind the first call and block as well. As soon as a corresponding put/take call is made an exchange will occur and the first blocked call will return.
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#empty? ⇒ Boolean
Predicate indicating if the buffer is empty.
-
#full? ⇒ Boolean
Predicate indicating if the buffer is full.
-
#next ⇒ Object, Boolean
Take the next “item” from the buffer and also return a boolean indicating if “more” items can be taken.
-
#offer(item) ⇒ Boolean
Put an item onto the buffer if possible.
-
#poll ⇒ Object
Take the next item from the buffer if one is available else return immediately.
-
#put(item) ⇒ Boolean
Put an item onto the buffer if possible.
-
#size ⇒ Object
The number of items currently in the buffer.
-
#take ⇒ Object
Take an item from the buffer if one is available.
Methods inherited from Base
#blocking?, #close, #closed?, #initialize
Constructor Details
This class inherits a constructor from Concurrent::Channel::Buffer::Base
Instance Method Details
#empty? ⇒ Boolean
Predicate indicating if the buffer is empty.
27 28 29 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 27 def empty? size == 0 end |
#full? ⇒ Boolean
Predicate indicating if the buffer is full.
32 33 34 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 32 def full? !empty? end |
#next ⇒ Object, Boolean
Take the next “item” from the buffer and also return a boolean indicating if “more” items can be taken. Used for iterating over a buffer until it is closed and empty.
If the buffer is open but no items remain the calling thread will block until an item is available. The second of the two return values, “more” (a boolean), will always be ‘true` when the buffer is open. The “more” value will be `false` when the channel has been closed and all values have already been received. When “more” is false the returned item will be `Concurrent::NULL`.
Note that when multiple threads access the same channel a race condition can occur when using this method. A call to ‘next` from one thread may return `true` for the second return value, but another thread may `take` the last value before the original thread makes another call. Code which iterates over a channel must be programmed to properly handle these race conditions.
Items can only be taken from the buffer when one or more threads are waiting to #put items onto the buffer. This method exhibits the same blocking behavior as #take.
135 136 137 138 139 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 135 def next item = take more = (item != Concurrent::NULL) return item, more end |
#offer(item) ⇒ Boolean
Put an item onto the buffer if possible. If the buffer is open but unable to add an item, probably due to being full, the method will return immediately. Similarly, the method will return immediately when the buffer is closed. A return value of ‘false` does not necessarily indicate that the buffer is closed, just that the item could not be added.
Items can only be put onto the buffer when one or more threads are waiting to #take items off the buffer. When there is a thread waiting to take an item this method will give its item and return ‘true` immediately. When there are no threads waiting to take or the buffer is closed, this method will return `false` immediately.
71 72 73 74 75 76 77 78 79 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 71 def offer(item) synchronize do return false if ns_closed? || taking.empty? taken = taking.shift taken.value = item true end end |
#poll ⇒ Object
Take the next item from the buffer if one is available else return immediately. Failing to return a value does not necessarily indicate that the buffer is closed, just that it is empty.
Items can only be taken off the buffer when one or more threads are waiting to #put items onto the buffer. When there is a thread waiting to put an item this method will take the item and return it immediately. When there are no threads waiting to put or the buffer is closed, this method will return ‘Concurrent::NULL` immediately.
117 118 119 120 121 122 123 124 125 126 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 117 def poll synchronize do return Concurrent::NULL if putting.empty? put = putting.shift value = put.value put.value = nil value end end |
#put(item) ⇒ Boolean
Put an item onto the buffer if possible. If the buffer is open but not able to accept the item the calling thread will block until the item can be put onto the buffer.
Items can only be put onto the buffer when one or more threads are waiting to #take items off the buffer. When there is a thread waiting to take an item this method will give its item and return immediately. When there are no threads waiting to take, this method will block. As soon as a thread calls ‘take` the exchange will occur and this method will return.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 44 def put(item) mine = synchronize do return false if ns_closed? ref = Concurrent::AtomicReference.new(item) if taking.empty? putting.push(ref) else taken = taking.shift taken.value = item ref.value = nil end ref end loop do return true if mine.value.nil? Thread.pass end end |
#size ⇒ Object
The number of items currently in the buffer.
20 21 22 23 24 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 20 def size synchronize do putting.empty? ? 0 : 1 end end |
#take ⇒ Object
Take an item from the buffer if one is available. If the buffer is open and no item is available the calling thread will block until an item is available. If the buffer is closed but items are available the remaining items can still be taken. Once the buffer closes, no remaining items can be taken.
Items can only be taken from the buffer when one or more threads are waiting to #put items onto the buffer. When there is a thread waiting to put an item this method will take that item and return it immediately. When there are no threads waiting to put, this method will block. As soon as a thread calls ‘pur` the exchange will occur and this method will return.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb', line 89 def take mine = synchronize do return Concurrent::NULL if ns_closed? && putting.empty? ref = Concurrent::AtomicReference.new(nil) if putting.empty? taking.push(ref) else put = putting.shift ref.value = put.value put.value = nil end ref end loop do item = mine.value return item if item Thread.pass end end |