Class: Concurrent::Throttle
- Inherits:
-
Synchronization::Object
- Object
- Synchronization::Object
- Concurrent::Throttle
- Includes:
- Promises::FactoryMethods
- Defined in:
- lib/concurrent-ruby-edge/concurrent/edge/throttle.rb
Overview
A tool managing concurrency level of tasks. The maximum capacity is set in constructor. Each acquire will lower the available capacity and release will increase it. When there is no available capacity the current thread may either be blocked or an event is returned which will be resolved when capacity becomes available.
The more common usage of the Throttle is with a proxy executor ‘a_throttle.on(Concurrent.global_io_executor)`. Anything executed on the proxy executor will be throttled and execute on the given executor. There can be more than one proxy executors. All abstractions which execute tasks have option to specify executor, therefore the proxy executor can be injected to any abstraction throttling its concurrency level.
Instance Method Summary collapse
-
#acquire(timeout = nil) { ... } ⇒ Object, ...
Blocks current thread until there is capacity available in the throttle.
-
#available_capacity ⇒ Integer
The available capacity.
-
#default_executor ⇒ ExecutorService
Uses executor provided by #on therefore all events and futures created using factory methods on this object will be throttled.
-
#initialize(capacity) ⇒ Throttle
constructor
Create throttle.
-
#max_capacity ⇒ Integer
The maximum capacity.
-
#on(executor = Promises::FactoryMethods.default_executor) ⇒ ExecutorService
An executor which wraps given executor and allows to post tasks only as available capacity in the throttle allows.
-
#release ⇒ self
Releases previously acquired capacity back to Throttle.
-
#to_s ⇒ String
(also: #inspect)
Short string representation.
-
#try_acquire ⇒ true, false
Tries to acquire capacity from the throttle.
Methods included from Promises::FactoryMethods
#zip_futures_over, #zip_futures_over_on
Constructor Details
#initialize(capacity) ⇒ Throttle
Create throttle.
37 38 39 40 41 42 43 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 37 def initialize(capacity) super() @MaxCapacity = capacity @Queue = LockFreeQueue.new @executor_cache = [nil, nil] self.capacity = capacity end |
Instance Method Details
#acquire(timeout = nil) { ... } ⇒ Object, ...
Blocks current thread until there is capacity available in the throttle. The acquired capacity has to be returned to the throttle by calling #release. If block is passed then the block is called after the capacity is acquired and it is automatically released after the block is executed.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 63 def acquire(timeout = nil, &block) event = acquire_or_event if event within_timeout = event.wait(timeout) # release immediately when acquired later after the timeout since it is unused event.on_resolution!(self, &:release) unless within_timeout else within_timeout = true end called = false if timeout if block if within_timeout called = true block.call else nil end else within_timeout end else if block called = true block.call else self end end ensure release if called end |
#available_capacity ⇒ Integer
Returns The available capacity.
30 31 32 33 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 30 def available_capacity current_capacity = capacity current_capacity >= 0 ? current_capacity : 0 end |
#default_executor ⇒ ExecutorService
Uses executor provided by #on therefore all events and futures created using factory methods on this object will be throttled. Overrides Promises::FactoryMethods#default_executor.
183 184 185 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 183 def default_executor on(super) end |
#max_capacity ⇒ Integer
Returns The maximum capacity.
46 47 48 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 46 def max_capacity @MaxCapacity end |
#on(executor = Promises::FactoryMethods.default_executor) ⇒ ExecutorService
Returns An executor which wraps given executor and allows to post tasks only as available capacity in the throttle allows.
162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 162 def on(executor = Promises::FactoryMethods.default_executor) current_executor, current_cache = @executor_cache return current_cache if current_executor == executor && current_cache if current_executor.nil? # cache first proxy proxy_executor = ProxyExecutor.new(self, Concurrent.executor(executor)) @executor_cache = [executor, proxy_executor] return proxy_executor else # do not cache more than 1 executor ProxyExecutor.new(self, Concurrent.executor(executor)) end end |
#release ⇒ self
Releases previously acquired capacity back to Throttle. Has to be called exactly once for each acquired capacity.
118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 118 def release while true current_capacity = capacity if compare_and_set_capacity current_capacity, current_capacity + 1 if current_capacity < 0 # release called after trigger which pushed a trigger, busy wait is ok Thread.pass until (trigger = @Queue.pop) trigger.resolve end return self end end end |
#to_s ⇒ String Also known as: inspect
Returns Short string representation.
133 134 135 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 133 def to_s format '%s capacity available %d of %d>', super[0..-2], capacity, @MaxCapacity end |
#try_acquire ⇒ true, false
Tries to acquire capacity from the throttle. Returns true when there is capacity available. The acquired capacity has to be returned to the throttle by calling #release.
102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 102 def try_acquire while true current_capacity = capacity if current_capacity > 0 return true if compare_and_set_capacity( current_capacity, current_capacity - 1) else return false end end end |