Class: Kafka::Pause
- Inherits:
-
Object
- Object
- Kafka::Pause
- Defined in:
- lib/kafka/pause.rb
Overview
Manages the pause state of a partition.
The processing of messages in a partition can be paused, e.g. if there was an exception during processing. This could be caused by a downstream service not being available. A typical way of solving such an issue is to back off for a little while and then try again. In order to do that, pause the partition.
Instance Method Summary collapse
-
#expired? ⇒ Boolean
Whether the pause has expired.
-
#initialize(clock: Time) ⇒ Pause
constructor
A new instance of Pause.
-
#pause!(timeout: nil, max_timeout: nil, exponential_backoff: false) ⇒ Object
Mark the partition as paused.
- #pause_duration ⇒ Object
-
#paused? ⇒ Boolean
Whether the partition is currently paused.
-
#reset! ⇒ Object
Resets the pause state, ensuring that the next pause is not exponential.
-
#resume! ⇒ Object
Resumes the partition.
Constructor Details
#initialize(clock: Time) ⇒ Pause
Returns a new instance of Pause.
12 13 14 15 16 17 18 19 |
# File 'lib/kafka/pause.rb', line 12 def initialize(clock: Time) @clock = clock @started_at = nil @pauses = 0 @timeout = nil @max_timeout = nil @exponential_backoff = false end |
Instance Method Details
#expired? ⇒ Boolean
Whether the pause has expired.
66 67 68 69 70 71 72 |
# File 'lib/kafka/pause.rb', line 66 def expired? # We never expire the pause if timeout is nil. return false if @timeout.nil? # Have we passed the end of the pause duration? @clock.now >= ends_at end |
#pause!(timeout: nil, max_timeout: nil, exponential_backoff: false) ⇒ Object
Mark the partition as paused.
If exponential backoff is enabled, each subsequent pause of a partition will cause a doubling of the actual timeout, i.e. for pause number n, the actual timeout will be 2^n * timeout.
Only when #reset! is called is this state cleared.
32 33 34 35 36 37 38 |
# File 'lib/kafka/pause.rb', line 32 def pause!(timeout: nil, max_timeout: nil, exponential_backoff: false) @started_at = @clock.now @timeout = timeout @max_timeout = max_timeout @exponential_backoff = exponential_backoff @pauses += 1 end |
#pause_duration ⇒ Object
57 58 59 60 61 62 63 |
# File 'lib/kafka/pause.rb', line 57 def pause_duration if paused? Time.now - @started_at else 0 end end |
#paused? ⇒ Boolean
Whether the partition is currently paused. The pause may have expired, in which case #expired? should be checked as well.
52 53 54 55 |
# File 'lib/kafka/pause.rb', line 52 def paused? # This is nil if we're not currently paused. !@started_at.nil? end |
#reset! ⇒ Object
Resets the pause state, ensuring that the next pause is not exponential.
75 76 77 |
# File 'lib/kafka/pause.rb', line 75 def reset! @pauses = 0 end |
#resume! ⇒ Object
Resumes the partition.
The number of pauses is still retained, and if the partition is paused again it may be with an exponential backoff.
44 45 46 47 48 |
# File 'lib/kafka/pause.rb', line 44 def resume! @started_at = nil @timeout = nil @max_timeout = nil end |