Class: Going::Channel
- Inherits:
-
Object
- Object
- Going::Channel
- Extended by:
- BooleanAttrReader
- Defined in:
- lib/going/channel.rb
Overview
This class represents message channels of specified capacity. The push operation may be blocked if the capacity is full. The shift operation may be blocked if no messages have been sent.
Instance Attribute Summary collapse
-
#capacity ⇒ Object
readonly
Returns the capacity of the channel.
Instance Method Summary collapse
-
#close ⇒ Object
Closes the channel.
-
#each ⇒ Object
Calls the given block once for each message until the channel is closed, passing that message as a parameter.
-
#empty? ⇒ Boolean
Returns whether the channel is empty.
-
#initialize(capacity = 0) {|_self| ... } ⇒ Channel
constructor
Creates a fixed-length channel with a capacity of
capacity
. - #inspect ⇒ Object
-
#push(obj, &on_complete) ⇒ Object
(also: #<<, #yield)
Pushes
obj
to the channel. -
#shift(&on_complete) ⇒ Object
(also: #receive, #next)
Receives data from the channel.
-
#size ⇒ Object
(also: #length)
Returns the number of messages in the channel.
Methods included from BooleanAttrReader
Constructor Details
#initialize(capacity = 0) {|_self| ... } ⇒ Channel
Creates a fixed-length channel with a capacity of capacity
.
13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/going/channel.rb', line 13 def initialize(capacity = 0) fail ArgumentError, 'channel capacity must be 0 or greater' if capacity < 0 @capacity = capacity @pushes = [] @shifts = [] @closed = false @mutex = Mutex.new yield self if block_given? end |
Instance Attribute Details
#capacity ⇒ Object (readonly)
Returns the capacity of the channel.
29 30 31 |
# File 'lib/going/channel.rb', line 29 def capacity @capacity end |
Instance Method Details
#close ⇒ Object
Closes the channel. Any data in the buffer may still be retrieved.
39 40 41 42 43 44 45 46 47 |
# File 'lib/going/channel.rb', line 39 def close synchronize do return false if closed? shifts.each(&:close).clear pushes_over_capacity!.each(&:close) @closed = true end end |
#each ⇒ Object
Calls the given block once for each message until the channel is closed, passing that message as a parameter.
Note that this is a destructive action, since each message is ‘shift`ed.
133 134 135 136 137 138 139 140 141 |
# File 'lib/going/channel.rb', line 133 def each return enum_for(:each) unless block_given? catch :close do loop do yield self.shift end end end |
#empty? ⇒ Boolean
Returns whether the channel is empty.
123 124 125 |
# File 'lib/going/channel.rb', line 123 def empty? size == 0 end |
#inspect ⇒ Object
143 144 145 146 147 148 |
# File 'lib/going/channel.rb', line 143 def inspect inspection = [:capacity, :size].map do |attr| "#{attr}: #{send(attr).inspect}" end "#<#{self.class} #{inspection.join(', ')}>" end |
#push(obj, &on_complete) ⇒ Object Also known as: <<, yield
Pushes obj
to the channel. If the channel is already full, waits until a thread shifts from it.
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/going/channel.rb', line 53 def push(obj, &on_complete) synchronize do push = Push.new(message: obj, select_statement: select_statement, &on_complete) pushes << push pair_with_shift push select_statement.when_complete(push, pushes, &method(:remove_operation)) if select_statement? push.complete if under_capacity? push.signal if select_statement? push.close if closed? push.wait(mutex) fail 'cannot push to a closed channel' if closed? && !select_statement? self end end |
#shift(&on_complete) ⇒ Object Also known as: receive, next
Receives data from the channel. If the channel is already empty, waits until a thread pushes to it.
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/going/channel.rb', line 83 def shift(&on_complete) synchronize do shift = Shift.new(select_statement: select_statement, &on_complete) shifts << shift pair_with_push shift select_statement.when_complete(shift, shifts, &method(:remove_operation)) if select_statement? shift.signal if select_statement? shift.close if closed? shift.wait(mutex) throw :close if closed? && !select_statement? && shift.incomplete? shift. end end |
#size ⇒ Object Also known as: length
Returns the number of messages in the channel
111 112 113 |
# File 'lib/going/channel.rb', line 111 def size [capacity, pushes.size].min end |