Class: Freno::Throttler
- Inherits:
-
Object
- Object
- Freno::Throttler
- Defined in:
- lib/freno/throttler.rb,
lib/freno/throttler/errors.rb,
lib/freno/throttler/mapper.rb,
lib/freno/throttler/instrumenter.rb,
lib/freno/throttler/circuit_breaker.rb
Overview
Freno::Throttler is the class responsible for throttling writes to a cluster or a set of clusters. Throttling means to slow down the pace at which write operations occur by checking with freno whether all the clusters affected by the operation are in good health before allowing it. If any of the clusters is not in good health, the throttler will wait some time and repeat the process.
Examples:
Let’s use the following throttler, which uses Mapper::Identity implicitly. (see #initialze docs)
“‘ throttler = Throttler.new(client: freno_client, app: :my_app) data.find_in_batches do |batch|
throttler.throttle([:mysqla, :mysqlb]) do
update(batch)
end
end “‘
Before each call to ‘update(batch)` the throttler will call freno to check the health of the `mysqla` and `mysqlb` stores on behalf of :my_app; and sleep if any of the stores is not ok.
Defined Under Namespace
Modules: CircuitBreaker, Instrumenter, Mapper Classes: CircuitOpen, ClientError, Error, WaitedTooLong
Constant Summary collapse
- DEFAULT_WAIT_SECONDS =
0.5
- DEFAULT_MAX_WAIT_SECONDS =
10
- REQUIRED_ARGS =
%i[ client app mapper instrumenter circuit_breaker wait_seconds max_wait_seconds ].freeze
Instance Attribute Summary collapse
-
#app ⇒ Object
Returns the value of attribute app.
-
#circuit_breaker ⇒ Object
Returns the value of attribute circuit_breaker.
-
#client ⇒ Object
Returns the value of attribute client.
-
#instrumenter ⇒ Object
Returns the value of attribute instrumenter.
-
#mapper ⇒ Object
Returns the value of attribute mapper.
-
#max_wait_seconds ⇒ Object
Returns the value of attribute max_wait_seconds.
-
#wait_seconds ⇒ Object
Returns the value of attribute wait_seconds.
Instance Method Summary collapse
-
#initialize(client: nil, app: nil, mapper: Mapper::Identity, instrumenter: Instrumenter::Noop, circuit_breaker: CircuitBreaker::Noop, wait_seconds: DEFAULT_WAIT_SECONDS, max_wait_seconds: DEFAULT_MAX_WAIT_SECONDS) {|_self| ... } ⇒ Throttler
constructor
Initializes a new instance of the throttler.
-
#throttle(context = nil, **options) ⇒ Object
This method receives a context to infer the set of stores that it needs to throttle writes to.
Constructor Details
#initialize(client: nil, app: nil, mapper: Mapper::Identity, instrumenter: Instrumenter::Noop, circuit_breaker: CircuitBreaker::Noop, wait_seconds: DEFAULT_WAIT_SECONDS, max_wait_seconds: DEFAULT_MAX_WAIT_SECONDS) {|_self| ... } ⇒ Throttler
Initializes a new instance of the throttler
In order to initialize a Throttler you need the following arguments:
- a `client`: a instance of Freno::Client
- an `app`: a symbol indicating the app-name for which Freno will respond
checks.
Also, you can optionally provide the following named arguments:
- `:mapper`: An object that responds to `call(context)` and returns a
`Enumerable` of the store names for which we need to wait for
replication delay. By default this is the `IdentityMapper`, which will
check the stores given as context.
For example, if the `throttler` object used the default mapper:
```
throttler.throttle(:mysqlc) do
update(batch)
end
```
- `:instrumenter`: An object that responds to
`instrument(event_name, context = {}, &block)` that can be used to
add cross-cutting concerns like logging or stats to the throttler.
By default, the instrumenter is `Instrumenter::Noop`, which does
nothing but yielding the block it receives.
- `:circuit_breaker`: An object responding to `allow_request?`,
`success`, and `failure?`, compatible with `Resilient::CircuitBreaker`
(see https://github.com/jnunemaker/resilient).
By default, the circuit breaker is `CircuitBreaker::Noop`, which
always allows requests, and does not provide resiliency guarantees.
- `:wait_seconds`: A positive float indicating the number of seconds the
throttler will wait before checking again, in case some of the stores
didn't catch-up the last time they were check.
- `:max_wait_seconds`: A positive float indicating the maxium number of
seconds the throttler will wait in total for replicas to catch-up
before raising a `WaitedTooLong` error.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/freno/throttler.rb', line 102 def initialize( client: nil, app: nil, mapper: Mapper::Identity, instrumenter: Instrumenter::Noop, circuit_breaker: CircuitBreaker::Noop, wait_seconds: DEFAULT_WAIT_SECONDS, max_wait_seconds: DEFAULT_MAX_WAIT_SECONDS ) @client = client @app = app @mapper = mapper @instrumenter = instrumenter @circuit_breaker = circuit_breaker @wait_seconds = wait_seconds @max_wait_seconds = max_wait_seconds yield self if block_given? validate_args end |
Instance Attribute Details
#app ⇒ Object
Returns the value of attribute app.
48 49 50 |
# File 'lib/freno/throttler.rb', line 48 def app @app end |
#circuit_breaker ⇒ Object
Returns the value of attribute circuit_breaker.
48 49 50 |
# File 'lib/freno/throttler.rb', line 48 def circuit_breaker @circuit_breaker end |
#client ⇒ Object
Returns the value of attribute client.
48 49 50 |
# File 'lib/freno/throttler.rb', line 48 def client @client end |
#instrumenter ⇒ Object
Returns the value of attribute instrumenter.
48 49 50 |
# File 'lib/freno/throttler.rb', line 48 def instrumenter @instrumenter end |
#mapper ⇒ Object
Returns the value of attribute mapper.
48 49 50 |
# File 'lib/freno/throttler.rb', line 48 def mapper @mapper end |
#max_wait_seconds ⇒ Object
Returns the value of attribute max_wait_seconds.
48 49 50 |
# File 'lib/freno/throttler.rb', line 48 def max_wait_seconds @max_wait_seconds end |
#wait_seconds ⇒ Object
Returns the value of attribute wait_seconds.
48 49 50 |
# File 'lib/freno/throttler.rb', line 48 def wait_seconds @wait_seconds end |
Instance Method Details
#throttle(context = nil, **options) ⇒ Object
This method receives a context to infer the set of stores that it needs to throttle writes to. It can also receive additional options which are passed to the underlying Check request object:
“‘ throttler = Throttler.new(client: freno_client, app: :my_app) data.find_in_batches do |batch|
throttler.throttle(:mysqla, low_priority: true) do
update(batch)
end
end “‘
With that information it asks freno whether all the stores are ok. In case they are, it executes the given block. Otherwise, it waits ‘wait_seconds` before trying again.
In case the throttler has waited more than ‘max_wait_seconds`, it raises a `WaitedTooLong` error.
In case there’s an underlying Freno error, it raises a ‘ClientError` error.
In case the circuit breaker is open, it raises a ‘CircuitOpen` error.
this method is instrumented, the instrumenter will receive the following events:
-
“throttler.called” each time this method is called
-
“throttler.succeeded” when the stores were ok, before yielding the block
-
“throttler.waited” when the stores were not ok, after waiting ‘wait_seconds`
-
“throttler.waited_too_long” when the stores were not ok, but the thottler already waited at least ‘max_wait_seconds`, right before raising `WaitedTooLong`
-
“throttler.freno_errored” when there was an error with freno, before raising ‘ClientError`.
-
“throttler.circuit_open” when the circuit breaker does not allow the next request, before raising ‘CircuitOpen`
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/freno/throttler.rb', line 164 def throttle(context = nil, **) store_names = mapper.call(context) instrument(:called, store_names: store_names) waited = 0 while true unless circuit_breaker.allow_request? instrument(:circuit_open, store_names: store_names, waited: waited) raise CircuitOpen end if all_stores_ok?(store_names, **) instrument(:succeeded, store_names: store_names, waited: waited) circuit_breaker.success break end if waited + wait_seconds > max_wait_seconds instrument(:waited_too_long, store_names: store_names, waited: waited, max: max_wait_seconds) circuit_breaker.failure raise WaitedTooLong.new(waited_seconds: waited, max_wait_seconds: max_wait_seconds) else wait waited += wait_seconds instrument(:waited, store_names: store_names, waited: waited, max: max_wait_seconds) end end yield end |