Class: ThreadedEnum

Inherits:
Object
  • Object
show all
Defined in:
lib/dream-ops/utils/threaded_enum.rb

Overview

Credit: github.com/bittrance/rxruby/blob/master/lib/rx/concurrency/threaded_enumerator.rb

ThreadedEnum can be used across threads unlike Ruby’s default Enumerator that will throw FiberError if the enumerator is used from more than one thread.

Defined Under Namespace

Classes: Yielder

Constant Summary collapse

ERROR =
Object.new
DONE =
Object.new

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(source_or_size_hint = nil, &block) ⇒ ThreadedEnum

The enumerator can be created with either an enumerable or a block that receives a yielder object, but not both. Note that the block or enumerable will be iterated immediately once making it possible to prepare the iterator e.g. when reading from a file or a socket.

Raises:

  • (TypeError)


36
37
38
39
40
41
42
43
# File 'lib/dream-ops/utils/threaded_enum.rb', line 36

def initialize(source_or_size_hint = nil, &block)
  raise TypeError, 'Size hinting not supported' if source_or_size_hint && block_given?
  @condition = ConditionVariable.new
  @gate = Mutex.new
  @queue = Queue.new
  @done = false
  setup_yielder(source_or_size_hint, &block)
end

Class Method Details

.new(*args, &block) ⇒ Object



11
12
13
# File 'lib/dream-ops/utils/threaded_enum.rb', line 11

def self.new(*args, &block)
  Enumerator.new(*args, &block)
end

Instance Method Details

#nextObject

Receive the next item from the enumerator or any exception thrown from the enumerator.

Raises:

  • (StopIteration)


47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/dream-ops/utils/threaded_enum.rb', line 47

def next
  raise StopIteration if @done
  @gate.synchronize do
    @condition.signal
  end
  payload, type = @queue.pop
  case type
  when DONE
    @done = true
    raise StopIteration
  when ERROR
    @done = true
    raise payload
  end
  payload
end