Class: Async::Channel
- Inherits:
-
Object
show all
- Defined in:
- lib/async/channel.rb
Defined Under Namespace
Classes: ChannelClosedError, Error
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(limit = 1, **options) ⇒ Channel
Returns a new instance of Channel.
10
11
12
13
14
15
16
|
# File 'lib/async/channel.rb', line 10
def initialize(limit = 1, **options)
@queue = Async::Q.new(limit, **options)
@subscribers = 0
@parent = options[:parent]
@closed = false
end
|
Instance Attribute Details
#subscribers ⇒ Object
Returns the value of attribute subscribers.
4
5
6
|
# File 'lib/async/channel.rb', line 4
def subscribers
@subscribers
end
|
Instance Method Details
#<<(item) ⇒ Object
27
|
# File 'lib/async/channel.rb', line 27
def <<(item) = enqueue(item)
|
#async(parent: (@parent || Async::Task.current), &block) ⇒ Object
80
81
82
83
84
|
# File 'lib/async/channel.rb', line 80
def async(parent: (@parent || Async::Task.current), &block)
each do |item|
parent.async(item, &block)
end
end
|
#close ⇒ Object
47
48
49
50
51
52
53
54
55
|
# File 'lib/async/channel.rb', line 47
def close
@closed = true
@queue.expand(@subscribers)
@subscribers.times do
@queue << [:close]
end
end
|
#closed? ⇒ Boolean
24
|
# File 'lib/async/channel.rb', line 24
def closed? = @closed
|
#count ⇒ Object
18
|
# File 'lib/async/channel.rb', line 18
def count = @queue.count
|
#dequeue ⇒ Object
57
58
59
60
61
62
63
64
65
|
# File 'lib/async/channel.rb', line 57
def dequeue
check_channel_readable!
type, message = @queue.dequeue
raise ChannelClosedError, "Channel was closed" if type == :close
raise message if type == :error
message
end
|
#each ⇒ Object
67
68
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/async/channel.rb', line 67
def each
check_channel_readable!
@subscribers += 1
while message = dequeue yield message
end
rescue ChannelClosedError
nil
ensure
@subscribers -= 1
end
|
#empty? ⇒ Boolean
23
|
# File 'lib/async/channel.rb', line 23
def empty? = @queue.empty?
|
#enqueue(message) ⇒ Object
29
30
31
32
33
|
# File 'lib/async/channel.rb', line 29
def enqueue(message)
check_channel_writeable!
@queue << [:message, message]
end
|
#enqueue_all(messages) ⇒ Object
35
36
37
38
39
|
# File 'lib/async/channel.rb', line 35
def enqueue_all(messages)
check_channel_writeable!
@queue.enqueue_all(messages.map { |message| [:message, message] })
end
|
#error(e) ⇒ Object
41
42
43
44
45
|
# File 'lib/async/channel.rb', line 41
def error(e)
check_channel_writeable!
@queue << [:error, e]
end
|
#full? ⇒ Boolean
22
|
# File 'lib/async/channel.rb', line 22
def full? = @queue.full?
|
#length ⇒ Object
19
|
# File 'lib/async/channel.rb', line 19
def length = @queue.length
|
#open? ⇒ Boolean
25
|
# File 'lib/async/channel.rb', line 25
def open? = !closed?
|
#size ⇒ Object
20
|
# File 'lib/async/channel.rb', line 20
def size = @queue.size
|