Class: Karafka::Pro::Processing::Filters::Throttler

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/pro/processing/filters/throttler.rb

Overview

Throttler used to limit number of messages we can process in a given time interval The tricky thing is, that even if we throttle on 100 messages, if we’ve reached 100, we still need to indicate, that we throttle despite not receiving 101. Otherwise we will not pause the partition and will fetch more data that we should not process.

This is a special type of a filter that always throttles and makes us wait / seek if anything is applied out.

Instance Attribute Summary

Attributes inherited from Base

#cursor

Instance Method Summary collapse

Methods inherited from Base

#applied?, #mark_as_consumed?, #marking_method

Constructor Details

#initialize(limit, interval) ⇒ Throttler

Returns a new instance of Throttler.

Parameters:

  • limit (Integer)

    how many messages we can process in a given time

  • interval (Integer)

    interval in milliseconds for which we want to process



29
30
31
32
33
34
35
# File 'lib/karafka/pro/processing/filters/throttler.rb', line 29

def initialize(limit, interval)
  super()

  @limit = limit
  @interval = interval
  @requests = Hash.new { |h, k| h[k] = 0 }
end

Instance Method Details

#actionSymbol

Returns action to take upon throttler reaching certain state.

Returns:

  • (Symbol)

    action to take upon throttler reaching certain state



66
67
68
69
70
71
72
# File 'lib/karafka/pro/processing/filters/throttler.rb', line 66

def action
  if applied?
    timeout.zero? ? :seek : :pause
  else
    :skip
  end
end

#apply!(messages) ⇒ Object

Limits number of messages to a range that we can process (if needed) and keeps track of how many messages we’ve processed in a given time

Parameters:

  • messages (Array<Karafka::Messages::Message>)

    limits the number of messages to number we can accept in the context of throttling constraints



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/karafka/pro/processing/filters/throttler.rb', line 41

def apply!(messages)
  @applied = false
  @cursor = nil
  @time = monotonic_now
  @requests.delete_if { |timestamp, _| timestamp < (@time - @interval) }
  values = @requests.values.sum
  accepted = 0

  messages.delete_if do |message|
    # +1 because of current
    @applied = (values + accepted + 1) > @limit

    @cursor = message if @applied && @cursor.nil?

    next true if @applied

    accepted += 1

    false
  end

  @requests[@time] += accepted
end

#timeoutInteger

Returns minimum number of milliseconds to wait before getting more messages so we are no longer throttled and so we can process at least one message.

Returns:

  • (Integer)

    minimum number of milliseconds to wait before getting more messages so we are no longer throttled and so we can process at least one message



76
77
78
79
# File 'lib/karafka/pro/processing/filters/throttler.rb', line 76

def timeout
  timeout = @interval - (monotonic_now - @time)
  timeout <= 0 ? 0 : timeout
end