Class: Async::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/async/channel.rb

Defined Under Namespace

Classes: ChannelClosedError, Error

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(limit = 1, **options) ⇒ Channel

Returns a new instance of Channel.



10
11
12
13
14
15
16
# File 'lib/async/channel.rb', line 10

def initialize(limit = 1, **options)
  @queue = Async::Q.new(limit, **options)
  @subscribers = 0

  @parent = options[:parent]
  @closed = false
end

Instance Attribute Details

#subscribersObject (readonly)

Returns the value of attribute subscribers.



4
5
6
# File 'lib/async/channel.rb', line 4

def subscribers
  @subscribers
end

Instance Method Details

#<<(item) ⇒ Object



27
# File 'lib/async/channel.rb', line 27

def <<(item) = enqueue(item)

#async(parent: (@parent || Async::Task.current), &block) ⇒ Object



80
81
82
83
84
# File 'lib/async/channel.rb', line 80

def async(parent: (@parent || Async::Task.current), &block)
  each do |item|
    parent.async(item, &block)
  end
end

#closeObject



47
48
49
50
51
52
53
54
55
# File 'lib/async/channel.rb', line 47

def close
  @closed = true

  @queue.expand(@subscribers)

  @subscribers.times do
    @queue << [:close]
  end
end

#closed?Boolean

Returns:

  • (Boolean)


24
# File 'lib/async/channel.rb', line 24

def closed? = @closed

#countObject



18
# File 'lib/async/channel.rb', line 18

def count = @queue.count

#dequeueObject

Raises:



57
58
59
60
61
62
63
64
65
# File 'lib/async/channel.rb', line 57

def dequeue
  check_channel_readable!

  type, message = @queue.dequeue
  raise ChannelClosedError, "Channel was closed" if type == :close
  raise message if type == :error # TODO: fix backtrace

  message
end

#eachObject



67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/async/channel.rb', line 67

def each
  check_channel_readable!

  @subscribers += 1
  while message = dequeue # rubocop:disable Lint/AssignmentInCondition
    yield message
  end
rescue ChannelClosedError
  nil
ensure
  @subscribers -= 1
end

#empty?Boolean

Returns:

  • (Boolean)


23
# File 'lib/async/channel.rb', line 23

def empty? = @queue.empty?

#enqueue(message) ⇒ Object



29
30
31
32
33
# File 'lib/async/channel.rb', line 29

def enqueue(message)
  check_channel_writeable!

  @queue << [:message, message]
end

#enqueue_all(messages) ⇒ Object



35
36
37
38
39
# File 'lib/async/channel.rb', line 35

def enqueue_all(messages)
  check_channel_writeable!

  @queue.enqueue_all(messages.map { |message| [:message, message] })
end

#error(e) ⇒ Object



41
42
43
44
45
# File 'lib/async/channel.rb', line 41

def error(e)
  check_channel_writeable!

  @queue << [:error, e]
end

#full?Boolean

Returns:

  • (Boolean)


22
# File 'lib/async/channel.rb', line 22

def full? = @queue.full?

#lengthObject



19
# File 'lib/async/channel.rb', line 19

def length = @queue.length

#open?Boolean

Returns:

  • (Boolean)


25
# File 'lib/async/channel.rb', line 25

def open? = !closed?

#sizeObject



20
# File 'lib/async/channel.rb', line 20

def size = @queue.size