Class: IOPromise::Faraday::FaradayExecutorPool

Inherits:
ExecutorPool::Base
  • Object
show all
Defined in:
lib/iopromise/faraday/executor_pool.rb

Instance Method Summary collapse

Constructor Details

#initializeFaradayExecutorPool

Returns a new instance of FaradayExecutorPool.



8
9
10
11
12
13
14
15
# File 'lib/iopromise/faraday/executor_pool.rb', line 8

def initialize(*)
  super

  @hydra = ContinuableHydra.for_current_thread
  @hydra.iop_handler = self

  @monitors = {}
end

Instance Method Details

#execute_continueObject



39
40
41
42
43
44
45
46
47
# File 'lib/iopromise/faraday/executor_pool.rb', line 39

def execute_continue
  # mark all pending promises as executing since they could be started any time now.
  # ideally we would do this on dequeue.
  @pending.each do |promise|
    begin_executing(promise) unless promise.started_executing?
  end

  @hydra.execute_continue
end

#monitor_add(io) ⇒ Object



17
18
19
# File 'lib/iopromise/faraday/executor_pool.rb', line 17

def monitor_add(io)
  @monitors[io] = ::IOPromise::ExecutorContext.current.register_observer_io(self, io, :r)
end

#monitor_ready(monitor, readiness) ⇒ Object



35
36
37
# File 'lib/iopromise/faraday/executor_pool.rb', line 35

def monitor_ready(monitor, readiness)
  @hydra.socket_is_ready(monitor.io, monitor.readable?, monitor.writable?)
end

#monitor_remove(io) ⇒ Object



21
22
23
24
# File 'lib/iopromise/faraday/executor_pool.rb', line 21

def monitor_remove(io)
  monitor = @monitors.delete(io)
  monitor.close unless monitor.nil?
end

#set_interests(io, interest) ⇒ Object



26
27
28
29
# File 'lib/iopromise/faraday/executor_pool.rb', line 26

def set_interests(io, interest)
  monitor = @monitors[io]
  monitor.interests = interest unless monitor.nil?
end

#set_timeout(timeout) ⇒ Object



31
32
33
# File 'lib/iopromise/faraday/executor_pool.rb', line 31

def set_timeout(timeout)
  self.select_timeout = timeout
end