Class: Kafka::Pause

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/pause.rb

Overview

Manages the pause state of a partition.

The processing of messages in a partition can be paused, e.g. if there was an exception during processing. This could be caused by a downstream service not being available. A typical way of solving such an issue is to back off for a little while and then try again. In order to do that, pause the partition.

Instance Method Summary collapse

Constructor Details

#initialize(clock: Time) ⇒ Pause

Returns a new instance of Pause.



12
13
14
15
16
17
18
19
# File 'lib/kafka/pause.rb', line 12

def initialize(clock: Time)
  @clock = clock
  @started_at = nil
  @pauses = 0
  @timeout = nil
  @max_timeout = nil
  @exponential_backoff = false
end

Instance Method Details

#expired?Boolean

Whether the pause has expired.

Returns:

  • (Boolean)


66
67
68
69
70
71
72
# File 'lib/kafka/pause.rb', line 66

def expired?
  # We never expire the pause if timeout is nil.
  return false if @timeout.nil?

  # Have we passed the end of the pause duration?
  @clock.now >= ends_at
end

#pause!(timeout: nil, max_timeout: nil, exponential_backoff: false) ⇒ Object

Mark the partition as paused.

If exponential backoff is enabled, each subsequent pause of a partition will cause a doubling of the actual timeout, i.e. for pause number n, the actual timeout will be 2^n * timeout.

Only when #reset! is called is this state cleared.

Parameters:

  • timeout (nil, Integer) (defaults to: nil)

    if specified, the partition will automatically resume after this many seconds.

  • exponential_backoff (Boolean) (defaults to: false)

    whether to enable exponential timeouts.



32
33
34
35
36
37
38
# File 'lib/kafka/pause.rb', line 32

def pause!(timeout: nil, max_timeout: nil, exponential_backoff: false)
  @started_at = @clock.now
  @timeout = timeout
  @max_timeout = max_timeout
  @exponential_backoff = exponential_backoff
  @pauses += 1
end

#pause_durationObject



57
58
59
60
61
62
63
# File 'lib/kafka/pause.rb', line 57

def pause_duration
  if paused?
    Time.now - @started_at
  else
    0
  end
end

#paused?Boolean

Whether the partition is currently paused. The pause may have expired, in which case #expired? should be checked as well.

Returns:

  • (Boolean)


52
53
54
55
# File 'lib/kafka/pause.rb', line 52

def paused?
  # This is nil if we're not currently paused.
  !@started_at.nil?
end

#reset!Object

Resets the pause state, ensuring that the next pause is not exponential.



75
76
77
# File 'lib/kafka/pause.rb', line 75

def reset!
  @pauses = 0
end

#resume!Object

Resumes the partition.

The number of pauses is still retained, and if the partition is paused again it may be with an exponential backoff.



44
45
46
47
48
# File 'lib/kafka/pause.rb', line 44

def resume!
  @started_at = nil
  @timeout = nil
  @max_timeout = nil
end