Class: Redstream::Trimmer
- Inherits:
-
Object
- Object
- Redstream::Trimmer
- Defined in:
- lib/redstream/trimmer.rb
Overview
The Redstream::Trimmer class is neccessary to clean up messsages after all consumers have successfully processed and committed them. Otherwise they would fill up redis and finally bring redis down due to out of memory issues. The Trimmer will sleep for the specified interval in case there is nothing to trim. Please note that you must pass an array containing all consumer names reading from the stream which is about to be trimmed. Otherwise the Trimmer could trim messages from the stream before all consumers received the respective messages.
Instance Method Summary collapse
-
#initialize(interval:, stream_name:, consumer_names:, logger: Logger.new("/dev/null")) ⇒ Trimmer
constructor
Initializes a new trimmer.
-
#run ⇒ Object
Loops and blocks forever trimming messages from the specified redis stream.
-
#run_once ⇒ Object
Runs the trimming a single time.
Constructor Details
#initialize(interval:, stream_name:, consumer_names:, logger: Logger.new("/dev/null")) ⇒ Trimmer
Initializes a new trimmer. Accepts an interval to sleep for in case there is nothing to trim, the actual stream name, the consumer names as well as a logger for debug log messages.
37 38 39 40 41 42 43 |
# File 'lib/redstream/trimmer.rb', line 37 def initialize(interval:, stream_name:, consumer_names:, logger: Logger.new("/dev/null")) @interval = interval @stream_name = stream_name @consumer_names = consumer_names @logger = logger @lock = Lock.new(name: "trimmer:#{stream_name}") end |
Instance Method Details
#run ⇒ Object
Loops and blocks forever trimming messages from the specified redis stream.
48 49 50 |
# File 'lib/redstream/trimmer.rb', line 48 def run loop { run_once } end |
#run_once ⇒ Object
Runs the trimming a single time. You usually want to use the #run method instead, which loops/blocks forever.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/redstream/trimmer.rb', line 55 def run_once got_lock = @lock.acquire do min_committed_id = Redstream.connection_pool.with do |redis| offset_key_names = @consumer_names.map do |consumer_name| Redstream.offset_key_name(stream_name: @stream_name, consumer_name: consumer_name) end redis.mget(offset_key_names).map(&:to_s).reject(&:empty?).min end return sleep(@interval) unless min_committed_id loop do = Redstream.connection_pool.with do |redis| redis.xrange(Redstream.stream_key_name(@stream_name), "-", min_committed_id, count: 1_000) end return sleep(@interval) if .nil? || .empty? Redstream.connection_pool.with { |redis| redis.xdel Redstream.stream_key_name(@stream_name), .map(&:first) } @logger.debug "Trimmed #{.size} messages from #{@stream_name}" end end sleep(5) unless got_lock rescue StandardError => e @logger.error e sleep 5 retry end |