Module: Karafka::Pro::Processing::Strategies::Ftr::Default

Includes:
Default
Included in:
Dlq::Ftr, Dlq::FtrLrjMom, Dlq::FtrMom, Vp, Lrj::Ftr, Mom::Ftr
Defined in:
lib/karafka/pro/processing/strategies/ftr/default.rb

Overview

Only filtering enabled

Constant Summary collapse

FEATURES =

Just filtering enabled

%i[
  filtering
].freeze

Instance Method Summary collapse

Methods included from Default

#handle_before_consume, #handle_before_schedule_consume, #handle_before_schedule_tick, #handle_consume, #handle_revoked, #handle_tick, #mark_as_consumed, #mark_as_consumed!, #mark_in_memory, #mark_in_transaction, #mark_with_transaction, #store_offset_metadata, #transaction

Methods included from Karafka::Processing::Strategies::Default

#commit_offsets, #commit_offsets!, #handle_before_consume, #handle_consume, #handle_eofed, #handle_initialized, #handle_revoked, #handle_shutdown, #handle_wrap, #mark_as_consumed, #mark_as_consumed!

Methods included from Karafka::Processing::Strategies::Base

#handle_before_consume, #handle_consume, #handle_revoked, #handle_shutdown

Instance Method Details

#handle_after_consumeObject

Standard flow without any features



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/karafka/pro/processing/strategies/ftr/default.rb', line 35

def handle_after_consume
  coordinator.on_finished do |last_group_message|
    return if revoked?

    if coordinator.success?
      coordinator.pause_tracker.reset

      # Do not mark last message if pause happened. This prevents a scenario where
      # pause is overridden upon rebalance by marking
      return if coordinator.manual_pause?

      mark_as_consumed(last_group_message)

      handle_post_filtering
    else
      retry_after_pause
    end
  end
end

#handle_idleObject

Empty run when running on idle means we need to filter



30
31
32
# File 'lib/karafka/pro/processing/strategies/ftr/default.rb', line 30

def handle_idle
  handle_post_filtering
end

#handle_post_filteringBoolean

Throttles by pausing for an expected time period if throttling is needed or seeks in case the throttle expired. Throttling may expire because we throttle before processing starts and we need to compensate for processing time. It may turn out that we don’t have to pause but we need to move the offset because we skipped some messages due to throttling filtering.

Returns:

  • (Boolean)

    was any form of throttling operations (pause or seek) needed



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/karafka/pro/processing/strategies/ftr/default.rb', line 61

def handle_post_filtering
  filter = coordinator.filter

  # We pick the timeout before the action because every action takes time. This time
  # may then mean we end up having throttle time equal to zero when pause is needed
  # and this should not happen
  throttle_timeout = filter.timeout

  # If user requested marking when applying filter, we mark. We may be in the user
  # flow but even then this is not a problem. Older offsets will be ignored since
  # we do not force the offset update (expected) and newer are on the user to control.
  # This can be primarily used when filtering large quantities of data to mark on the
  # idle runs, so lag reporting is aware that those messages were not consumed but also
  # are no longer relevant
  if filter.mark_as_consumed?
    send(
      filter.marking_method,
      filter.cursor
    )
  end

  case filter.action
  when :skip
    nil
  when :seek
    # User direct actions take priority over automatic operations
    # If we've already seeked we can just resume operations, nothing extra needed
    return resume if coordinator.manual_seek?

    throttle_message = filter.cursor

    monitor.instrument(
      'filtering.seek',
      caller: self,
      message: throttle_message
    ) do
      seek(throttle_message.offset, false)
    end

    resume
  when :pause
    # User direct actions take priority over automatic operations
    return nil if coordinator.manual_pause?

    throttle_message = filter.cursor

    monitor.instrument(
      'filtering.throttled',
      caller: self,
      message: throttle_message,
      timeout: throttle_timeout
    ) do
      pause(throttle_message.offset, throttle_timeout, false)
    end
  else
    raise Karafka::Errors::UnsupportedCaseError filter.action
  end
end