Class: Concurrent::Promises::Channel
- Inherits:
-
Synchronization::Object
- Object
- Synchronization::Object
- Concurrent::Promises::Channel
- Defined in:
- lib/concurrent-ruby-edge/concurrent/edge/channel.rb
Overview
A first in first out channel that accepts messages with push family of methods and returns messages with pop family of methods. Pop and push operations can be represented as futures, see #pop_op and #push_op. The capacity of the channel can be limited to support back pressure, use capacity option in #initialize. #pop method blocks ans #pop_op returns pending future if there is no message in the channel. If the capacity is limited the #push method blocks and #push_op returns pending future.
Constant Summary collapse
- UNLIMITED_CAPACITY =
Default capacity of the Channel, makes it accept unlimited number of messages.
::Object.new
- ANY =
An object which matches anything (with #===)
Object.new.tap do |any| def any.===(other) true end def any.to_s 'ANY' end end
Class Method Summary collapse
- .select(channels, timeout = nil) ⇒ ::Array(Channel, Object)?
- .select_matching(matcher, channels, timeout = nil) ⇒ ::Array(Channel, Object)?
- .select_op(channels, probe = Promises.resolvable_future) ⇒ Future(::Array(Channel, Object))
- .select_op_matching(matcher, channels, probe = Promises.resolvable_future) ⇒ Future(::Array(Channel, Object))
- .try_select(channels) ⇒ ::Array(Channel, Object)
- .try_select_matching(matcher, channels) ⇒ ::Array(Channel, Object)
Instance Method Summary collapse
-
#capacity ⇒ Integer
Maximum capacity of the Channel.
-
#initialize(capacity = UNLIMITED_CAPACITY) ⇒ Channel
constructor
Create channel.
-
#peek(no_value = nil) ⇒ Object, no_value
Behaves as #try_pop but it does not remove the message from the channel.
-
#peek_matching(matcher, no_value = nil) ⇒ Object, no_value
Behaves as #try_pop but it does not remove the message from the channel.
-
#pop(timeout = nil, timeout_value = nil) ⇒ Object?
Blocks current thread until a message is available in the channel for popping.
-
#pop_matching(matcher, timeout = nil, timeout_value = nil) ⇒ Object?
Blocks current thread until a message is available in the channel for popping.
-
#pop_op(probe = Promises.resolvable_future) ⇒ Future(Object)
Returns a future witch will become fulfilled with a value from the channel when one is available.
-
#pop_op_matching(matcher, probe = Promises.resolvable_future) ⇒ Future(Object)
Returns a future witch will become fulfilled with a value from the channel when one is available.
-
#push(message, timeout = nil) ⇒ self, ...
Blocks current thread until the message is pushed into the channel.
-
#push_op(message) ⇒ ResolvableFuture(self)
Returns future which will fulfill when the message is pushed to the channel.
-
#select(channels, timeout = nil) ⇒ ::Array(Channel, Object)?
As #select_op but does not return future, it block current thread instead until there is a message available in the receiver or in any of the channels.
-
#select_matching(matcher, channels, timeout = nil) ⇒ ::Array(Channel, Object)?
As #select_op but does not return future, it block current thread instead until there is a message available in the receiver or in any of the channels.
-
#select_op(channels, probe = Promises.resolvable_future) ⇒ ResolvableFuture(::Array(Channel, Object))
When message is available in the receiver or any of the provided channels the future is fulfilled with a channel message pair.
-
#select_op_matching(matcher, channels, probe = Promises.resolvable_future) ⇒ ResolvableFuture(::Array(Channel, Object))
When message is available in the receiver or any of the provided channels the future is fulfilled with a channel message pair.
-
#size ⇒ Integer
The number of messages currently stored in the channel.
-
#to_s ⇒ String
(also: #inspect)
Short string representation.
-
#try_pop(no_value = nil) ⇒ Object, no_value
Pop a message from the channel if there is one available.
-
#try_pop_matching(matcher, no_value = nil) ⇒ Object, no_value
Pop a message from the channel if there is one available.
-
#try_push(message) ⇒ true, false
Push the message into the channel if there is space available.
-
#try_select(channels) ⇒ ::Array(Channel, Object)?
If message is available in the receiver or any of the provided channels the channel message pair is returned.
-
#try_select_matching(matcher, channels) ⇒ ::Array(Channel, Object)?
If message is available in the receiver or any of the provided channels the channel message pair is returned.
Constructor Details
#initialize(capacity = UNLIMITED_CAPACITY) ⇒ Channel
Create channel.
64 65 66 67 68 69 70 71 72 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 64 def initialize(capacity = UNLIMITED_CAPACITY) super() @Capacity = capacity @Mutex = Mutex.new # TODO (pitr-ch 28-Jan-2019): consider linked lists or other data structures for following attributes, things are being deleted from the middle @Probes = [] @Messages = [] @PendingPush = [] end |
Class Method Details
.select(channels, timeout = nil) ⇒ ::Array(Channel, Object)?
322 323 324 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 322 def select(channels, timeout = nil) channels.first.select(channels[1..-1], timeout) end |
.select_matching(matcher, channels, timeout = nil) ⇒ ::Array(Channel, Object)?
340 341 342 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 340 def select_matching(matcher, channels, timeout = nil) channels.first.select_matching(matcher, channels[1..-1], timeout) end |
.select_op(channels, probe = Promises.resolvable_future) ⇒ Future(::Array(Channel, Object))
316 317 318 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 316 def select_op(channels, probe = Promises.resolvable_future) channels.first.select_op(channels[1..-1], probe) end |
.select_op_matching(matcher, channels, probe = Promises.resolvable_future) ⇒ Future(::Array(Channel, Object))
334 335 336 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 334 def select_op_matching(matcher, channels, probe = Promises.resolvable_future) channels.first.select_op_matching(matcher, channels[1..-1], probe) end |
.try_select(channels) ⇒ ::Array(Channel, Object)
310 311 312 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 310 def try_select(channels) channels.first.try_select(channels[1..-1]) end |
.try_select_matching(matcher, channels) ⇒ ::Array(Channel, Object)
328 329 330 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 328 def try_select_matching(matcher, channels) channels.first.try_select_matching(matcher, channels[1..-1]) end |
Instance Method Details
#capacity ⇒ Integer
Returns Maximum capacity of the Channel.
295 296 297 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 295 def capacity @Capacity end |
#peek(no_value = nil) ⇒ Object, no_value
Behaves as #try_pop but it does not remove the message from the channel
209 210 211 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 209 def peek(no_value = nil) peek_matching ANY, no_value end |
#peek_matching(matcher, no_value = nil) ⇒ Object, no_value
Behaves as #try_pop but it does not remove the message from the channel
215 216 217 218 219 220 221 222 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 215 def peek_matching(matcher, no_value = nil) @Mutex.synchronize do = matcher, false return if != NOTHING = ns_consume_pending_push matcher, false return != NOTHING ? : no_value end end |
#pop(timeout = nil, timeout_value = nil) ⇒ Object?
This function potentially blocks current thread until it can continue. Be careful it can deadlock.
Blocks current thread until a message is available in the channel for popping.
177 178 179 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 177 def pop(timeout = nil, timeout_value = nil) pop_matching ANY, timeout, timeout_value end |
#pop_matching(matcher, timeout = nil, timeout_value = nil) ⇒ Object?
This function potentially blocks current thread until it can continue. Be careful it can deadlock.
Blocks current thread until a message is available in the channel for popping.
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 183 def pop_matching(matcher, timeout = nil, timeout_value = nil) # TODO (pitr-ch 27-Jan-2019): should it try to match pending pushes if it fails to match in the buffer? Maybe only if the size is zero. It could be surprising if it's used as a throttle it might be expected that it will not pop if buffer is full of messages which di not match, it might it expected it will block until the message is added to the buffer # that it returns even if the buffer is full. User might expect that it has to be in the buffer first. probe = @Mutex.synchronize do = matcher if == NOTHING = ns_consume_pending_push matcher return if != NOTHING else = ns_consume_pending_push ANY @Messages.push unless == NOTHING return end probe = Promises.resolvable_future @Probes.push probe, false, matcher probe end probe.value!(timeout, timeout_value, [true, timeout_value, nil]) end |
#pop_op(probe = Promises.resolvable_future) ⇒ Future(Object)
Returns a future witch will become fulfilled with a value from the channel when one is available. If it is later waited on the operation with a timeout e.g.‘channel.pop_op.wait(1)` it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later “`ruby pop_op = channel.pop_op if pop_op.wait(1)
pop_op.value
else
pop_op.then { || }
end “‘ or the operation can be prevented from completion after timing out by using `channel.pop_op.wait(1, [true, nil, nil])`. It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.
160 161 162 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 160 def pop_op(probe = Promises.resolvable_future) @Mutex.synchronize { ns_pop_op(ANY, probe, false) } end |
#pop_op_matching(matcher, probe = Promises.resolvable_future) ⇒ Future(Object)
Returns a future witch will become fulfilled with a value from the channel when one is available. If it is later waited on the operation with a timeout e.g.‘channel.pop_op.wait(1)` it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later “`ruby pop_op = channel.pop_op if pop_op.wait(1)
pop_op.value
else
pop_op.then { || }
end “‘ or the operation can be prevented from completion after timing out by using `channel.pop_op.wait(1, [true, nil, nil])`. It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.
166 167 168 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 166 def pop_op_matching(matcher, probe = Promises.resolvable_future) @Mutex.synchronize { ns_pop_op(matcher, probe, false) } end |
#push(message, timeout = nil) ⇒ self, ...
This function potentially blocks current thread until it can continue. Be careful it can deadlock.
Blocks current thread until the message is pushed into the channel.
120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 120 def push(, timeout = nil) pushed_op = @Mutex.synchronize do return timeout ? true : self if ns_try_push() pushed = Promises.resolvable_future # TODO (pitr-ch 06-Jan-2019): clear timed out pushes in @PendingPush, null messages @PendingPush.push , pushed pushed end result = pushed_op.wait!(timeout, [true, self, nil]) result == pushed_op ? self : result end |
#push_op(message) ⇒ ResolvableFuture(self)
Returns future which will fulfill when the message is pushed to the channel. If it is later waited on the operation with a timeout e.g.‘channel.pop_op.wait(1)` it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later “`ruby pop_op = channel.pop_op if pop_op.wait(1)
pop_op.value
else
pop_op.then { || }
end “‘ or the operation can be prevented from completion after timing out by using `channel.pop_op.wait(1, [true, nil, nil])`. It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.
101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 101 def push_op() @Mutex.synchronize do if ns_try_push() Promises.fulfilled_future self else pushed = Promises.resolvable_future @PendingPush.push , pushed return pushed end end end |
#select(channels, timeout = nil) ⇒ ::Array(Channel, Object)?
This function potentially blocks current thread until it can continue. Be careful it can deadlock.
As #select_op but does not return future, it block current thread instead until there is a message available in the receiver or in any of the channels.
278 279 280 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 278 def select(channels, timeout = nil) select_matching ANY, channels, timeout end |
#select_matching(matcher, channels, timeout = nil) ⇒ ::Array(Channel, Object)?
This function potentially blocks current thread until it can continue. Be careful it can deadlock.
As #select_op but does not return future, it block current thread instead until there is a message available in the receiver or in any of the channels.
284 285 286 287 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 284 def select_matching(matcher, channels, timeout = nil) probe = select_op_matching(matcher, channels) probe.value!(timeout, nil, [true, nil, nil]) end |
#select_op(channels, probe = Promises.resolvable_future) ⇒ ResolvableFuture(::Array(Channel, Object))
When message is available in the receiver or any of the provided channels the future is fulfilled with a channel message pair. The returned channel is the origin of the message. If it is later waited on the operation with a timeout e.g.‘channel.pop_op.wait(1)` it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later “`ruby pop_op = channel.pop_op if pop_op.wait(1)
pop_op.value
else
pop_op.then { || }
end “‘ or the operation can be prevented from completion after timing out by using `channel.pop_op.wait(1, [true, nil, nil])`. It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.
257 258 259 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 257 def select_op(channels, probe = Promises.resolvable_future) select_op_matching ANY, channels, probe end |
#select_op_matching(matcher, channels, probe = Promises.resolvable_future) ⇒ ResolvableFuture(::Array(Channel, Object))
When message is available in the receiver or any of the provided channels the future is fulfilled with a channel message pair. The returned channel is the origin of the message. If it is later waited on the operation with a timeout e.g.‘channel.pop_op.wait(1)` it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later “`ruby pop_op = channel.pop_op if pop_op.wait(1)
pop_op.value
else
pop_op.then { || }
end “‘ or the operation can be prevented from completion after timing out by using `channel.pop_op.wait(1, [true, nil, nil])`. It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.
263 264 265 266 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 263 def select_op_matching(matcher, channels, probe = Promises.resolvable_future) [self, *channels].each { |ch| ch.partial_select_op matcher, probe } probe end |
#size ⇒ Integer
Returns The number of messages currently stored in the channel.
290 291 292 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 290 def size @Mutex.synchronize { @Messages.size } end |
#to_s ⇒ String Also known as: inspect
Returns Short string representation.
300 301 302 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 300 def to_s format '%s capacity taken %s of %s>', super[0..-2], size, @Capacity end |
#try_pop(no_value = nil) ⇒ Object, no_value
Pop a message from the channel if there is one available.
138 139 140 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 138 def try_pop(no_value = nil) try_pop_matching ANY, no_value end |
#try_pop_matching(matcher, no_value = nil) ⇒ Object, no_value
Pop a message from the channel if there is one available.
145 146 147 148 149 150 151 152 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 145 def try_pop_matching(matcher, no_value = nil) @Mutex.synchronize do = matcher return if != NOTHING = ns_consume_pending_push matcher return != NOTHING ? : no_value end end |
#try_push(message) ⇒ true, false
Push the message into the channel if there is space available.
77 78 79 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 77 def try_push() @Mutex.synchronize { ns_try_push() } end |
#try_select(channels) ⇒ ::Array(Channel, Object)?
If message is available in the receiver or any of the provided channels the channel message pair is returned. If there is no message nil is returned. The returned channel is the origin of the message.
232 233 234 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 232 def try_select(channels) try_select_matching ANY, channels end |
#try_select_matching(matcher, channels) ⇒ ::Array(Channel, Object)?
If message is available in the receiver or any of the provided channels the channel message pair is returned. If there is no message nil is returned. The returned channel is the origin of the message.
238 239 240 241 242 243 244 245 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 238 def try_select_matching(matcher, channels) = nil channel = [self, *channels].find do |ch| = ch.try_pop_matching(matcher, NOTHING) != NOTHING end channel ? [channel, ] : nil end |