Class: Karafka::Pro::Cli::ParallelSegments::Collapse
- Defined in:
- lib/karafka/pro/cli/parallel_segments/collapse.rb
Overview
Note:
Running this can cause you some double processing if the parallel segments final offsets are not aligned.
Note:
This will not remove the parallel segments consumer groups. Please use the Admin API if you want them to be removed.
Takes the committed offset of each parallel segment for each topic and records them back onto the segment origin consumer group. Without --force it will raise an error on conflicts. With --force it will take the lowest possible offset for each topic partition as the baseline.
Instance Method Summary collapse
-
#call ⇒ Object
Runs the collapse operation.
Methods inherited from Base
Methods included from Helpers::Colorize
Constructor Details
This class inherits a constructor from Karafka::Pro::Cli::ParallelSegments::Base
Instance Method Details
#call ⇒ Object
Runs the collapse operation
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/karafka/pro/cli/parallel_segments/collapse.rb', line 39 def call puts "Starting parallel segments collapse..." segments_count = applicable_groups.size if segments_count.zero? puts "#{red("No")} consumer groups with parallel segments configuration found" return end puts( "Found #{green(segments_count)} consumer groups with parallel segments configuration" ) collapses = [] applicable_groups.each do |segment_origin, segments| puts puts "Collecting group #{yellow(segment_origin)} details..." offsets = collect_offsets(segment_origin, segments) unless .key?(:force) puts puts "Validating offsets positions for #{yellow(segment_origin)} consumer group..." validate!(offsets, segment_origin) end puts puts "Computing collapsed offsets for #{yellow(segment_origin)} consumer group..." collapses << collapse(offsets, segments) end collapses.each do |collapse| apply(collapse) end puts puts "Collapse completed #{green("successfully")}!" end |