Class: GirlFriday::Batch
- Inherits:
-
Object
- Object
- GirlFriday::Batch
- Defined in:
- lib/girl_friday/batch.rb
Overview
Batch represents a set of operations which can be processed concurrently. Asking for the results of the batch acts as a barrier: the calling thread will block until all operations have completed. Results are guaranteed to be returned in the same order as the operations are given.
Internally a girl_friday queue is created which limits the number of concurrent operations based on the :size option.
TODO Errors are not handled well at all.
Instance Method Summary collapse
-
#initialize(enumerable = nil, options = {}, &block) ⇒ Batch
constructor
A new instance of Batch.
- #push(msg) ⇒ Object (also: #<<)
- #results(timeout = nil) ⇒ Object
Constructor Details
#initialize(enumerable = nil, options = {}, &block) ⇒ Batch
Returns a new instance of Batch.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/girl_friday/batch.rb', line 15 def initialize(enumerable=nil, ={}, &block) @queue = GirlFriday::Queue.new(:batch, , &block) @complete = 0 @size = 0 @results = [] if enumerable @size = enumerable.count @results = Array.new(@size) end @lock = Mutex.new @condition = ConditionVariable.new @frozen = false start(enumerable) end |
Instance Method Details
#push(msg) ⇒ Object Also known as: <<
39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/girl_friday/batch.rb', line 39 def push(msg) raise ArgumentError, "Batch is frozen, you cannot push more items into it" if @frozen @lock.synchronize do @results << nil @size += 1 index = @results.size - 1 @queue.push(msg) do |result| completion(result, index) end end end |
#results(timeout = nil) ⇒ Object
30 31 32 33 34 35 36 37 |
# File 'lib/girl_friday/batch.rb', line 30 def results(timeout=nil) @frozen = true @lock.synchronize do @condition.wait(@lock, timeout) if @complete != @size @queue.shutdown @results end end |