Class: Karafka::TimeTrackers::Pause

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/time_trackers/pause.rb

Overview

Note:

We do not have to worry about performance implications of a mutex wrapping most of the code here, as this is not a frequently used tracker. It is active only once per batch in case of long-running-jobs and upon errors.

Handles Kafka topic partition pausing and resuming with exponential back-offs. Since expiring and pausing can happen from both consumer and listener, this needs to be thread-safe.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(timeout:, max_timeout:, exponential_backoff:) ⇒ Karafka::TimeTrackers::Pause

Examples:

options = { timeout: 1000, max_timeout: 1000, exponential_backoff: false }
pause = Karafka::TimeTrackers::Pause.new(**options)
pause.expired? #=> true
pause.paused? #=> false
pause.pause
pause.increment
sleep(1.1)
pause.paused? #=> true
pause.expired? #=> true
pause.attempt #=> 1
pause.pause
pause.increment
pause.attempt #=> 2
pause.paused? #=> true
pause.expired? #=> false
pause.resume
pause.attempt #=> 2
pause.paused? #=> false
pause.reset
pause.attempt #=> 0

Parameters:

  • timeout (Integer)

    how long should we wait when anything went wrong (in ms)

  • max_timeout (Integer, nil)

    if exponential is on, what is the max value we can reach exponentially on which we will stay

  • exponential_backoff (Boolean)

    should we wait exponentially or with the same timeout value



42
43
44
45
46
47
48
49
50
51
# File 'lib/karafka/time_trackers/pause.rb', line 42

def initialize(timeout:, max_timeout:, exponential_backoff:)
  @started_at = nil
  @attempt = 0
  @timeout = timeout
  @current_timeout = timeout
  @max_timeout = max_timeout
  @exponential_backoff = exponential_backoff
  @mutex = Mutex.new
  super()
end

Instance Attribute Details

#attemptObject (readonly)

Returns the value of attribute attempt.



13
14
15
# File 'lib/karafka/time_trackers/pause.rb', line 13

def attempt
  @attempt
end

#current_timeoutObject (readonly)

Returns the value of attribute current_timeout.



13
14
15
# File 'lib/karafka/time_trackers/pause.rb', line 13

def current_timeout
  @current_timeout
end

Instance Method Details

#expireObject

Expires the pause, so it can be considered expired



82
83
84
85
86
# File 'lib/karafka/time_trackers/pause.rb', line 82

def expire
  @mutex.synchronize do
    @ends_at = nil
  end
end

#expired?Boolean

Returns did the pause expire.

Returns:

  • (Boolean)

    did the pause expire



96
97
98
99
100
# File 'lib/karafka/time_trackers/pause.rb', line 96

def expired?
  @mutex.synchronize do
    @ends_at ? monotonic_now >= @ends_at : true
  end
end

#incrementObject

Increments the number of attempt by 1



67
68
69
70
71
# File 'lib/karafka/time_trackers/pause.rb', line 67

def increment
  @mutex.synchronize do
    @attempt += 1
  end
end

#pause(timeout = backoff_interval) ⇒ Object

Note:

Providing this value can be useful when we explicitly want to pause for a certain period of time, outside of any regular pausing logic

Pauses the processing from now till the end of the interval (backoff or non-backoff) and records the attempt.

Parameters:

  • timeout (Integer) (defaults to: backoff_interval)

    timeout value in milliseconds that overwrites the default timeout



58
59
60
61
62
63
64
# File 'lib/karafka/time_trackers/pause.rb', line 58

def pause(timeout = backoff_interval)
  @mutex.synchronize do
    @current_timeout = timeout
    @started_at = monotonic_now
    @ends_at = @started_at + timeout
  end
end

#paused?Boolean

Returns are we paused from processing.

Returns:

  • (Boolean)

    are we paused from processing



89
90
91
92
93
# File 'lib/karafka/time_trackers/pause.rb', line 89

def paused?
  @mutex.synchronize do
    !@started_at.nil?
  end
end

#resetObject

Resets the pause attempt count.



103
104
105
106
107
# File 'lib/karafka/time_trackers/pause.rb', line 103

def reset
  @mutex.synchronize do
    @attempt = 0
  end
end

#resumeObject

Marks the pause as resumed.



74
75
76
77
78
79
# File 'lib/karafka/time_trackers/pause.rb', line 74

def resume
  @mutex.synchronize do
    @started_at = nil
    @ends_at = nil
  end
end