Class: Async::Q

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/async/q.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(limit = Float::INFINITY, items: [], parent: nil) ⇒ Q

Returns a new instance of Q.

Raises:

  • (ArgumentError)


12
13
14
15
16
17
18
19
20
21
# File 'lib/async/q.rb', line 12

def initialize(limit = Float::INFINITY, items: [], parent: nil)
  @limit = limit || Float::INFINITY
  @parent = parent

  raise ArgumentError, "Items size is greter than limit: #{items.count} > #{@limit}" if items.count > @limit

  @items = items
  @any_notification = Async::Notification.new
  @free_notification = Async::Notification.new
end

Instance Attribute Details

#itemsObject (readonly)

Returns the value of attribute items.



10
11
12
# File 'lib/async/q.rb', line 10

def items
  @items
end

#limitObject (readonly)

Returns the value of attribute limit.



10
11
12
# File 'lib/async/q.rb', line 10

def limit
  @limit
end

Instance Method Details

#<<(item) ⇒ Object



32
# File 'lib/async/q.rb', line 32

def <<(item) = enqueue(item)

#async(parent: (@parent || Task.current), &block) ⇒ Object



72
73
74
75
76
# File 'lib/async/q.rb', line 72

def async(parent: (@parent || Task.current), &block)
  each do |item|
    parent.async(item, &block)
  end
end

#countObject



23
# File 'lib/async/q.rb', line 23

def count = @items.count

#dequeueObject



56
57
58
59
60
61
62
63
64
# File 'lib/async/q.rb', line 56

def dequeue
  @any_notification.wait while empty?

  item = @items.shift

  @free_notification.signal

  item
end

#eachObject



66
67
68
69
70
# File 'lib/async/q.rb', line 66

def each
  while item = dequeue # rubocop:disable Lint/AssignmentInCondition
    yield(item)
  end
end

#empty?Boolean

Returns:

  • (Boolean)


28
# File 'lib/async/q.rb', line 28

def empty? = @items.empty?

#enqueue(item) ⇒ Object



47
48
49
50
51
52
# File 'lib/async/q.rb', line 47

def enqueue(item)
  @free_notification.wait while full?

  @items.push(item)
  @any_notification.signal
end

#enqueue_all(items) ⇒ Object



54
# File 'lib/async/q.rb', line 54

def enqueue_all(items) = items.each { |item| enqueue(item) }

#expand(n) ⇒ Object



30
# File 'lib/async/q.rb', line 30

def expand(n) = resize(limit + n)

#full?Boolean

Returns:

  • (Boolean)


27
# File 'lib/async/q.rb', line 27

def full? = size >= @limit

#lengthObject



25
# File 'lib/async/q.rb', line 25

def length = @items.length

#resize(new_limit) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/async/q.rb', line 34

def resize(new_limit)
  if new_limit > @limit
    @limit = new_limit
    @free_notification.signal
  elsif new_limit <= 0
    raise ArgumentError, "Limit cannot be <= 0: #{new_limit}"
  elsif size > new_limit
    raise ArgumentError, "New limit cannot be lower than the current size: #{size} > #{new_limit}"
  else
    @limit = new_limit
  end
end

#shrink(n) ⇒ Object



31
# File 'lib/async/q.rb', line 31

def shrink(n) = resize(limit - n)

#sizeObject



24
# File 'lib/async/q.rb', line 24

def size = @items.size