Class: Concurrent::Throttle

Inherits:
Synchronization::Object
  • Object
show all
Includes:
Promises::FactoryMethods
Defined in:
lib/concurrent-ruby-edge/concurrent/edge/throttle.rb

Overview

A tool managing concurrency level of tasks. The maximum capacity is set in constructor. Each acquire will lower the available capacity and release will increase it. When there is no available capacity the current thread may either be blocked or an event is returned which will be resolved when capacity becomes available.

The more common usage of the Throttle is with a proxy executor ‘a_throttle.on(Concurrent.global_io_executor)`. Anything executed on the proxy executor will be throttled and execute on the given executor. There can be more than one proxy executors. All abstractions which execute tasks have option to specify executor, therefore the proxy executor can be injected to any abstraction throttling its concurrency level.

Instance Method Summary collapse

Methods included from Promises::FactoryMethods

#zip_futures_over, #zip_futures_over_on

Constructor Details

#initialize(capacity) ⇒ Throttle

Create throttle.

Parameters:

  • capacity (Integer)

    How many tasks using this throttle can run at the same time.



37
38
39
40
41
42
43
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 37

def initialize(capacity)
  super()
  @MaxCapacity            = capacity
  @Queue                  = LockFreeQueue.new
  @executor_cache         = [nil, nil]
  self.capacity = capacity
end

Instance Method Details

#acquire(timeout = nil) { ... } ⇒ Object, ...

Blocks current thread until there is capacity available in the throttle. The acquired capacity has to be returned to the throttle by calling #release. If block is passed then the block is called after the capacity is acquired and it is automatically released after the block is executed.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Yields:

  • block to execute after the capacity is acquired

Returns:

  • (Object, self, true, false)
    • When no timeout and no block it returns self

    • When no timeout and with block it returns the result of the block

    • When with timeout and no block it returns true when acquired and false when timed out

    • When with timeout and with block it returns the result of the block of nil on timing out

See Also:



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 63

def acquire(timeout = nil, &block)
  event = acquire_or_event
  if event
    within_timeout = event.wait(timeout)
    # release immediately when acquired later after the timeout since it is unused
    event.on_resolution!(self, &:release) unless within_timeout
  else
    within_timeout = true
  end

  called = false
  if timeout
    if block
      if within_timeout
        called = true
        block.call
      else
        nil
      end
    else
      within_timeout
    end
  else
    if block
      called = true
      block.call
    else
      self
    end
  end
ensure
  release if called
end

#available_capacityInteger

Returns The available capacity.

Returns:

  • (Integer)

    The available capacity.



30
31
32
33
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 30

def available_capacity
  current_capacity = capacity
  current_capacity >= 0 ? current_capacity : 0
end

#default_executorExecutorService

Uses executor provided by #on therefore all events and futures created using factory methods on this object will be throttled. Overrides Promises::FactoryMethods#default_executor.

Returns:

  • (ExecutorService)

See Also:

  • Promises::FactoryMethods#default_executor


183
184
185
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 183

def default_executor
  on(super)
end

#max_capacityInteger

Returns The maximum capacity.

Returns:

  • (Integer)

    The maximum capacity.



46
47
48
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 46

def max_capacity
  @MaxCapacity
end

#on(executor = Promises::FactoryMethods.default_executor) ⇒ ExecutorService

Returns An executor which wraps given executor and allows to post tasks only as available capacity in the throttle allows.

Examples:

throttling future

a_future.then_on(a_throttle.on(:io)) { a_throttled_task }

Parameters:

  • executor (ExecutorService) (defaults to: Promises::FactoryMethods.default_executor)

Returns:

  • (ExecutorService)

    An executor which wraps given executor and allows to post tasks only as available capacity in the throttle allows.



162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 162

def on(executor = Promises::FactoryMethods.default_executor)
  current_executor, current_cache = @executor_cache
  return current_cache if current_executor == executor && current_cache

  if current_executor.nil?
    # cache first proxy
    proxy_executor  = ProxyExecutor.new(self, Concurrent.executor(executor))
    @executor_cache = [executor, proxy_executor]
    return proxy_executor
  else
    # do not cache more than 1 executor
    ProxyExecutor.new(self, Concurrent.executor(executor))
  end
end

#releaseself

Releases previously acquired capacity back to Throttle. Has to be called exactly once for each acquired capacity.

Returns:

  • (self)

See Also:

  • #acquire, #try_acquire


118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 118

def release
  while true
    current_capacity = capacity
    if compare_and_set_capacity current_capacity, current_capacity + 1
      if current_capacity < 0
        # release called after trigger which pushed a trigger, busy wait is ok
        Thread.pass until (trigger = @Queue.pop)
        trigger.resolve
      end
      return self
    end
  end
end

#to_sString Also known as: inspect

Returns Short string representation.

Returns:

  • (String)

    Short string representation.



133
134
135
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 133

def to_s
  format '%s capacity available %d of %d>', super[0..-2], capacity, @MaxCapacity
end

#try_acquiretrue, false

Tries to acquire capacity from the throttle. Returns true when there is capacity available. The acquired capacity has to be returned to the throttle by calling #release.

Returns:

  • (true, false)

See Also:



102
103
104
105
106
107
108
109
110
111
112
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 102

def try_acquire
  while true
    current_capacity = capacity
    if current_capacity > 0
      return true if compare_and_set_capacity(
          current_capacity, current_capacity - 1)
    else
      return false
    end
  end
end