Class: Going::Channel

Inherits:
Object
  • Object
show all
Extended by:
BooleanAttrReader
Defined in:
lib/going/channel.rb

Overview

This class represents message channels of specified capacity. The push operation may be blocked if the capacity is full. The shift operation may be blocked if no messages have been sent.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from BooleanAttrReader

battr_reader

Constructor Details

#initialize(capacity = 0) {|_self| ... } ⇒ Channel

Creates a fixed-length channel with a capacity of capacity.

Yields:

  • (_self)

Yield Parameters:



13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/going/channel.rb', line 13

def initialize(capacity = 0)
  fail ArgumentError, 'channel capacity must be 0 or greater' if capacity < 0
  @capacity = capacity

  @pushes = []
  @shifts = []

  @closed = false
  @mutex = Mutex.new

  yield self if block_given?
end

Instance Attribute Details

#capacityObject (readonly)

Returns the capacity of the channel.



29
30
31
# File 'lib/going/channel.rb', line 29

def capacity
  @capacity
end

Instance Method Details

#closeObject

Closes the channel. Any data in the buffer may still be retrieved.



39
40
41
42
43
44
45
46
47
# File 'lib/going/channel.rb', line 39

def close
  synchronize do
    return false if closed?

    shifts.each(&:close).clear
    pushes_over_capacity!.each(&:close)
    @closed = true
  end
end

#eachObject

Calls the given block once for each message until the channel is closed, passing that message as a parameter.

Note that this is a destructive action, since each message is ‘shift`ed.



133
134
135
136
137
138
139
140
141
# File 'lib/going/channel.rb', line 133

def each
  return enum_for(:each) unless block_given?

  catch :close do
    loop do
      yield self.shift
    end
  end
end

#empty?Boolean

Returns whether the channel is empty.

Returns:

  • (Boolean)


123
124
125
# File 'lib/going/channel.rb', line 123

def empty?
  size == 0
end

#inspectObject



143
144
145
146
147
148
# File 'lib/going/channel.rb', line 143

def inspect
  inspection = [:capacity, :size].map do |attr|
    "#{attr}: #{send(attr).inspect}"
  end
  "#<#{self.class} #{inspection.join(', ')}>"
end

#push(obj, &on_complete) ⇒ Object Also known as: <<, yield

Pushes obj to the channel. If the channel is already full, waits until a thread shifts from it.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/going/channel.rb', line 53

def push(obj, &on_complete)
  synchronize do
    push = Push.new(message: obj, select_statement: select_statement, &on_complete)
    pushes << push

    pair_with_shift push

    select_statement.when_complete(push, pushes, &method(:remove_operation)) if select_statement?

    push.complete if under_capacity?
    push.signal if select_statement?
    push.close if closed?

    push.wait(mutex)

    fail 'cannot push to a closed channel' if closed? && !select_statement?
    self
  end
end

#shift(&on_complete) ⇒ Object Also known as: receive, next

Receives data from the channel. If the channel is already empty, waits until a thread pushes to it.



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/going/channel.rb', line 83

def shift(&on_complete)
  synchronize do
    shift = Shift.new(select_statement: select_statement, &on_complete)
    shifts << shift

    pair_with_push shift

    select_statement.when_complete(shift, shifts, &method(:remove_operation)) if select_statement?

    shift.signal if select_statement?
    shift.close if closed?

    shift.wait(mutex)

    throw :close if closed? && !select_statement? && shift.incomplete?
    shift.message
  end
end

#sizeObject Also known as: length

Returns the number of messages in the channel



111
112
113
# File 'lib/going/channel.rb', line 111

def size
  [capacity, pushes.size].min
end