Class: Karafka::Pro::Processing::Coordinators::ErrorsTracker

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/karafka/pro/processing/coordinators/errors_tracker.rb

Overview

Object used to track errors in between executions to be able to build error-type based recovery flows.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, partition, limit: STORAGE_LIMIT) ⇒ ErrorsTracker

Note:

limit does not apply to the counts. They will work beyond the number of errors occurring

Returns a new instance of ErrorsTracker.

Parameters:

  • topic (Karafka::Routing::Topic)
  • partition (Integer)
  • limit (Integer) (defaults to: STORAGE_LIMIT)

    max number of errors we want to keep for reference when implementing custom error handling.



58
59
60
61
62
63
64
65
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 58

def initialize(topic, partition, limit: STORAGE_LIMIT)
  @errors = []
  @counts = Hash.new { |hash, key| hash[key] = 0 }
  @topic = topic
  @partition = partition
  @limit = limit
  @trace_id = SecureRandom.uuid
end

Instance Attribute Details

#countsHash (readonly)

Returns:

  • (Hash)


40
41
42
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 40

def counts
  @counts
end

#partitionInteger (readonly)

Returns partition of this error tracker.

Returns:

  • (Integer)

    partition of this error tracker



37
38
39
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 37

def partition
  @partition
end

#topicKarafka::Routing::Topic (readonly)

Returns topic of this error tracker.

Returns:



34
35
36
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 34

def topic
  @topic
end

#trace_idString (readonly)

Returns:

  • (String)


43
44
45
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 43

def trace_id
  @trace_id
end

Instance Method Details

#<<(error) ⇒ Object

Parameters:

  • error (StandardError)

    adds the error to the tracker



74
75
76
77
78
79
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 74

def <<(error)
  @errors.shift if @errors.size >= @limit
  @errors << error
  @counts[error.class] += 1
  @trace_id = SecureRandom.uuid
end

#allArray<StandardError>

Returns array with all the errors that occurred.

Returns:

  • (Array<StandardError>)

    array with all the errors that occurred



104
105
106
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 104

def all
  @errors
end

#clearObject

Clears all the errors



68
69
70
71
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 68

def clear
  @errors.clear
  @counts.clear
end

#eachObject

Iterates over errors



99
100
101
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 99

def each(&)
  @errors.each(&)
end

#empty?Boolean

Returns is the error tracker empty.

Returns:

  • (Boolean)

    is the error tracker empty



82
83
84
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 82

def empty?
  @errors.empty?
end

#lastStandardError?

Returns last error that occurred or nil if no errors.

Returns:

  • (StandardError, nil)

    last error that occurred or nil if no errors



94
95
96
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 94

def last
  @errors.last
end

#sizeInteger

Returns number of elements.

Returns:

  • (Integer)

    number of elements



87
88
89
90
91
# File 'lib/karafka/pro/processing/coordinators/errors_tracker.rb', line 87

def size
  # We use counts reference of all errors and not the `@errors` array because it allows
  # us to go beyond the whole errors storage limit
  @counts.values.sum
end