Class: BackPressure::GatedExecutor
- Defined in:
- lib/back_pressure/gated_executor.rb
Overview
A GatedExecutor is an implementation of Executor that allows external control of back-pressure state, and is useful when non-blocking APIs provide hooks for identifying when they should block.
Instance Method Summary collapse
-
#back_pressure_engaged? ⇒ Boolean
Helper method for determining if back-pressure is currently engaged.
-
#blocked? ⇒ Boolean
Helper method for determining if any threads are currently blocked by back-pressure.
-
#blocked_threads ⇒ Set{Thread}
Helper method for observing which threads, if any, are blocked at the instant the method is invoked.
-
#engage_back_pressure(reason = DEFAULT_REASON) ⇒ void
Engages back-pressure and immediately returns; threads that send this instance ‘GatedExecutor#execute` will be blocked until back-pressure is removed.
-
#execute(blocking_time_limit = nil) ⇒ Boolean
Executes the provided block, after waiting out any back-pressure, returning ‘true` IFF the block was executed.
-
#execute!(blocking_time_limit = nil) ⇒ Object
Executes the provided block, after waiting out any back-pressure, returning the result of the block or raising an ‘ExecutionExpired` exception if the provided limit was reached before execution could begin.
-
#initialize(logger: nil, description: nil, log_threshold: 1) {|gated_executor| ... } ⇒ GatedExecutor
constructor
A new instance of GatedExecutor.
-
#remove_back_pressure(reason = DEFAULT_REASON) ⇒ void
Removes back-pressure, waking any threads that are currently blocked by back-pressure, and immediately returns.
Constructor Details
#initialize(logger: nil, description: nil, log_threshold: 1) {|gated_executor| ... } ⇒ GatedExecutor
Returns a new instance of GatedExecutor.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/back_pressure/gated_executor.rb', line 59 def initialize(logger: nil, description: nil, log_threshold: 1) @logger = logger @desc = (description ? description.dup : "#{self.class.name}<#{__id__}>").freeze @log_threshold = log_threshold @control_mutex = Mutex.new @control_condv = ConditionVariable.new @blocked_threads = Set.new @blocked_threads_mutex = Mutex.new yield(self) if block_given? end |
Instance Method Details
#back_pressure_engaged? ⇒ Boolean
Helper method for determining if back-pressure is currently engaged.
119 120 121 |
# File 'lib/back_pressure/gated_executor.rb', line 119 def back_pressure_engaged? @control_mutex.synchronize { @back_pressure_engaged } end |
#blocked? ⇒ Boolean
This method should be used only for observation-based tooling.
Helper method for determining if any threads are currently blocked by back-pressure.
162 163 164 |
# File 'lib/back_pressure/gated_executor.rb', line 162 def blocked? blocked_threads.any? end |
#blocked_threads ⇒ Set{Thread}
This method should be used only for observation-based tooling.
Helper method for observing which threads, if any, are blocked at the instant the method is invoked. The returned value is a frozen snapshot, and the included threads are not guaranteed to be still blocking by the time they are accessed.
156 157 158 |
# File 'lib/back_pressure/gated_executor.rb', line 156 def blocked_threads @blocked_threads_mutex.synchronize { @blocked_threads.dup.freeze } end |
#engage_back_pressure(reason = DEFAULT_REASON) ⇒ void
This method returns an undefined value.
Engages back-pressure and immediately returns; threads that send this instance ‘GatedExecutor#execute` will be blocked until back-pressure is removed.
84 85 86 87 88 89 90 91 92 93 |
# File 'lib/back_pressure/gated_executor.rb', line 84 def engage_back_pressure(reason=DEFAULT_REASON) @control_mutex.synchronize do if !@back_pressure_engaged @back_pressure_engaged = true @logger && @logger.info("#{@desc} back-pressure engaged (#{reason})") else @logger && @logger.debug("#{@desc} attempted to engage back-pressure when it is already engaged (#{reason})") end end end |
#execute(blocking_time_limit = nil) ⇒ Boolean
Care must be taken to ensure that back-pressure control is executed outside of this block, as the block provided is not executed while back-pressure is engaged.
Executes the provided block, after waiting out any back-pressure, returning ‘true` IFF the block was executed.
129 130 131 132 133 134 135 136 137 138 |
# File 'lib/back_pressure/gated_executor.rb', line 129 def execute(blocking_time_limit=nil) fail(ArgumentError, 'block required!') unless block_given? if !@back_pressure_engaged || block_until_back_pressure_removed(blocking_time_limit) yield return true else return false end end |
#execute!(blocking_time_limit = nil) ⇒ Object
Care must be taken to ensure that back-pressure control is executed outside of this block, as the block provided is not executed while back-pressure is engaged.
Executes the provided block, after waiting out any back-pressure, returning the result of the block or raising an ‘ExecutionExpired` exception if the provided limit was reached before execution could begin.
146 147 148 149 150 151 152 |
# File 'lib/back_pressure/gated_executor.rb', line 146 def execute!(blocking_time_limit=nil) execute(blocking_time_limit) do return yield end fail(ExecutionExpired) end |
#remove_back_pressure(reason = DEFAULT_REASON) ⇒ void
No guarantee of ordering are made with regard to threads that are blocked at the instant back-pressure is removed.
This method returns an undefined value.
Removes back-pressure, waking any threads that are currently blocked by back-pressure, and immediately returns.
103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/back_pressure/gated_executor.rb', line 103 def remove_back_pressure(reason=DEFAULT_REASON) @control_mutex.synchronize do if @back_pressure_engaged @back_pressure_engaged = false @logger && @logger.info("#{@desc} back-pressure removed (#{reason})") @control_condv.broadcast # wakeup _all_ waiting threads else @logger && @logger.debug("#{@desc} attempted to remove back-pressure when it not engaged (#{reason})") end end end |