Class: Enumerator::Async

Inherits:
Enumerator show all
Defined in:
lib/enumerator/async.rb

Instance Method Summary collapse

Constructor Details

#initialize(enum, pool_size = nil) ⇒ Async

Returns a new instance of Async.



9
10
11
12
13
14
15
16
17
# File 'lib/enumerator/async.rb', line 9

def initialize(enum, pool_size = nil)
  pool_size = (pool_size || enum.count).to_i
  unless pool_size >= 1
    message = "Thread pool size is invalid! Expected a positive integer but got: #{pool_size}"
    raise ArgumentError, message
  end

  @enum, @pool_size = enum, pool_size
end

Instance Method Details

#eachObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/enumerator/async.rb', line 32

def each
  raise_error(:each) unless block_given?
  
  queue = SizedQueue.new @pool_size
          
  threads = @pool_size.times.map do
    Thread.new do
      loop do
        item = queue.pop
        item != EOQ ? yield(item) : break
      end
    end
  end
  
  begin
    loop { queue.push @enum.next }
  rescue StopIteration
  ensure
    @pool_size.times { queue.push EOQ }
  end

  threads.each(&:join)
  @enum.rewind
  self
end

#mapObject



68
69
70
71
72
73
74
75
76
# File 'lib/enumerator/async.rb', line 68

def map
  raise_error(:map) unless block_given?
  
  [].tap do |outs|
    with_index do |item, index|
      outs[index] = yield(item)
    end
  end
end

#sizeObject



28
29
30
# File 'lib/enumerator/async.rb', line 28

def size
  @enum.size
end

#syncObject Also known as: to_enum



23
24
25
# File 'lib/enumerator/async.rb', line 23

def sync
  @enum
end

#to_aObject



19
20
21
# File 'lib/enumerator/async.rb', line 19

def to_a
  @enum.to_a
end

#with_index(start = 0, &work) ⇒ Object



58
59
60
61
# File 'lib/enumerator/async.rb', line 58

def with_index(start = 0, &work)
  @enum = @enum.with_index(start)
  block_given? ? each(&work) : self
end

#with_object(object, &work) ⇒ Object



63
64
65
66
# File 'lib/enumerator/async.rb', line 63

def with_object(object, &work)
  @enum = @enum.with_object(object)
  block_given? ? (each(&work) and object) : self
end