Class: Enumerator::Async
- Inherits:
-
Enumerator
- Object
- Enumerator
- Enumerator::Async
- Defined in:
- lib/enumerator/async.rb
Instance Method Summary collapse
- #each ⇒ Object
-
#initialize(enum, pool_size = nil) ⇒ Async
constructor
A new instance of Async.
- #map ⇒ Object
- #size ⇒ Object
- #sync ⇒ Object (also: #to_enum)
- #to_a ⇒ Object
- #with_index(start = 0, &work) ⇒ Object
- #with_object(object, &work) ⇒ Object
Constructor Details
#initialize(enum, pool_size = nil) ⇒ Async
Returns a new instance of Async.
9 10 11 12 13 14 15 16 17 |
# File 'lib/enumerator/async.rb', line 9 def initialize(enum, pool_size = nil) pool_size = (pool_size || enum.count).to_i unless pool_size >= 1 = "Thread pool size is invalid! Expected a positive integer but got: #{pool_size}" raise ArgumentError, end @enum, @pool_size = enum, pool_size end |
Instance Method Details
#each ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/enumerator/async.rb', line 32 def each raise_error(:each) unless block_given? queue = SizedQueue.new @pool_size threads = @pool_size.times.map do Thread.new do loop do item = queue.pop item != EOQ ? yield(item) : break end end end begin loop { queue.push @enum.next } rescue StopIteration ensure @pool_size.times { queue.push EOQ } end threads.each(&:join) @enum.rewind self end |
#map ⇒ Object
68 69 70 71 72 73 74 75 76 |
# File 'lib/enumerator/async.rb', line 68 def map raise_error(:map) unless block_given? [].tap do |outs| with_index do |item, index| outs[index] = yield(item) end end end |
#size ⇒ Object
28 29 30 |
# File 'lib/enumerator/async.rb', line 28 def size @enum.size end |
#sync ⇒ Object Also known as: to_enum
23 24 25 |
# File 'lib/enumerator/async.rb', line 23 def sync @enum end |
#to_a ⇒ Object
19 20 21 |
# File 'lib/enumerator/async.rb', line 19 def to_a @enum.to_a end |
#with_index(start = 0, &work) ⇒ Object
58 59 60 61 |
# File 'lib/enumerator/async.rb', line 58 def with_index(start = 0, &work) @enum = @enum.with_index(start) block_given? ? each(&work) : self end |
#with_object(object, &work) ⇒ Object
63 64 65 66 |
# File 'lib/enumerator/async.rb', line 63 def with_object(object, &work) @enum = @enum.with_object(object) block_given? ? (each(&work) and object) : self end |