Class: IOPromise::Deferred::DeferredExecutorPool
- Inherits:
-
ExecutorPool::Batch
- Object
- ExecutorPool::Base
- ExecutorPool::Batch
- IOPromise::Deferred::DeferredExecutorPool
- Defined in:
- lib/iopromise/deferred/executor_pool.rb
Instance Attribute Summary
Attributes inherited from ExecutorPool::Base
Instance Method Summary collapse
- #execute_continue ⇒ Object
-
#initialize ⇒ DeferredExecutorPool
constructor
A new instance of DeferredExecutorPool.
Methods inherited from ExecutorPool::Batch
Methods inherited from ExecutorPool::Base
#begin_executing, for, #promise_cancelled, #promise_fulfilled, #promise_rejected, #register, #sync
Constructor Details
#initialize ⇒ DeferredExecutorPool
Returns a new instance of DeferredExecutorPool.
6 7 8 9 10 11 12 13 |
# File 'lib/iopromise/deferred/executor_pool.rb', line 6 def initialize(*) super # register a dummy reader that never fires, to indicate to the event loop that # there is a valid, active ExecutorPool. @pipe_rd, @pipe_wr = IO.pipe @iop_monitor = ::IOPromise::ExecutorContext.current.register_observer_io(self, @pipe_rd, :r) end |
Instance Method Details
#execute_continue ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/iopromise/deferred/executor_pool.rb', line 15 def execute_continue if @current_batch.empty? next_batch end # we are just running this in the sync cycle, in a blocking way. timeouts = [] @current_batch.each do |promise| time_until_execution = promise.time_until_execution if time_until_execution <= 0 begin_executing(promise) promise.run_deferred else timeouts << time_until_execution end end if timeouts.empty? @select_timeout = nil else # ensure we get back to this loop not too long after @select_timeout = timeouts.min end # we reset the batch - the promises that are not completed will still be # pending and will be available next time we are called. @current_batch = [] end |