Class: IOPromise::Deferred::DeferredExecutorPool

Inherits:
ExecutorPool::Batch show all
Defined in:
lib/iopromise/deferred/executor_pool.rb

Instance Attribute Summary

Attributes inherited from ExecutorPool::Base

#select_timeout

Instance Method Summary collapse

Methods inherited from ExecutorPool::Batch

#next_batch

Methods inherited from ExecutorPool::Base

#begin_executing, for, #promise_cancelled, #promise_fulfilled, #promise_rejected, #register, #sync

Constructor Details

#initializeDeferredExecutorPool

Returns a new instance of DeferredExecutorPool.



6
7
8
9
10
11
12
13
# File 'lib/iopromise/deferred/executor_pool.rb', line 6

def initialize(*)
  super

  # register a dummy reader that never fires, to indicate to the event loop that
  # there is a valid, active ExecutorPool.
  @pipe_rd, @pipe_wr = IO.pipe
  @iop_monitor = ::IOPromise::ExecutorContext.current.register_observer_io(self, @pipe_rd, :r)
end

Instance Method Details

#execute_continueObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/iopromise/deferred/executor_pool.rb', line 15

def execute_continue
  if @current_batch.empty?
    next_batch
  end

  # we are just running this in the sync cycle, in a blocking way.
  timeouts = []
  @current_batch.each do |promise|
    time_until_execution = promise.time_until_execution
    if time_until_execution <= 0
      begin_executing(promise)
      promise.run_deferred
    else
      timeouts << time_until_execution
    end
  end

  if timeouts.empty?
    @select_timeout = nil
  else
    # ensure we get back to this loop not too long after 
    @select_timeout = timeouts.min
  end

  # we reset the batch - the promises that are not completed will still be
  # pending and will be available next time we are called.
  @current_batch = []
end