Class: IOPromise::ExecutorPool::Base

Inherits:
Object
  • Object
show all
Includes:
Promise::Observer
Defined in:
lib/iopromise/executor_pool/base.rb

Direct Known Subclasses

Batch, Sequential

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_pool) ⇒ Base

Returns a new instance of Base.



19
20
21
22
23
24
25
26
# File 'lib/iopromise/executor_pool/base.rb', line 19

def initialize(connection_pool)
  @connection_pool = connection_pool
  @pending = []

  @monitors = {}

  @select_timeout = nil
end

Instance Attribute Details

#select_timeoutObject

Returns the value of attribute select_timeout.



17
18
19
# File 'lib/iopromise/executor_pool/base.rb', line 17

def select_timeout
  @select_timeout
end

Class Method Details

.for(connection_pool) ⇒ Object



11
12
13
14
# File 'lib/iopromise/executor_pool/base.rb', line 11

def for(connection_pool)
  @executors ||= {}
  @executors[connection_pool] ||= new(connection_pool)
end

Instance Method Details

#begin_executing(item) ⇒ Object



43
44
45
# File 'lib/iopromise/executor_pool/base.rb', line 43

def begin_executing(item)
  item.beginning
end

#execute_continueObject

Continue execution of one or more pending IOPromises assigned to this pool. Implementations may choose to pre-register IO handled using:

ExecutorContext.current.register_observer_io(...)

Alternatively, they can be registered when this function is called. During this function, implementations should check for timeouts and run any housekeeping operations.

Must be implemented by subclasses.

Raises:

  • (NotImplementedError)


55
56
57
# File 'lib/iopromise/executor_pool/base.rb', line 55

def execute_continue
  raise NotImplementedError
end

#promise_cancelled(item) ⇒ Object



39
40
41
# File 'lib/iopromise/executor_pool/base.rb', line 39

def promise_cancelled(item)
  @pending.delete(item)
end

#promise_fulfilled(_value, item) ⇒ Object



33
34
35
# File 'lib/iopromise/executor_pool/base.rb', line 33

def promise_fulfilled(_value, item)
  @pending.delete(item)
end

#promise_rejected(_reason, item) ⇒ Object



36
37
38
# File 'lib/iopromise/executor_pool/base.rb', line 36

def promise_rejected(_reason, item)
  @pending.delete(item)
end

#register(item) ⇒ Object



28
29
30
31
# File 'lib/iopromise/executor_pool/base.rb', line 28

def register(item)
  @pending << item
  item.subscribe(self, item, item)
end

#syncObject



59
60
61
62
63
# File 'lib/iopromise/executor_pool/base.rb', line 59

def sync
  @pending.each do |promise|
    promise.sync if promise.is_a?(Promise)
  end
end