Class: Redstream::Trimmer

Inherits:
Object
  • Object
show all
Defined in:
lib/redstream/trimmer.rb

Overview

The Redstream::Trimmer class is neccessary to clean up messsages after all consumers have successfully processed and committed them. Otherwise they would fill up redis and finally bring redis down due to out of memory issues. The Trimmer will sleep for the specified interval in case there is nothing to trim. Please note that you must pass an array containing all consumer names reading from the stream which is about to be trimmed. Otherwise the Trimmer could trim messages from the stream before all consumers received the respective messages.

Examples:

trimmer = Redstream::Trimmer.new(
  interval: 30,
  stream_name: "users",
  consumer_names: ["indexer", "cacher"]
)

trimmer.run

Instance Method Summary collapse

Constructor Details

#initialize(interval:, stream_name:, consumer_names:, logger: Logger.new("/dev/null")) ⇒ Trimmer

Initializes a new trimmer. Accepts an interval to sleep for in case there is nothing to trim, the actual stream name, the consumer names as well as a logger for debug log messages.

Parameters:

  • interval (Fixnum, Float)

    Specifies a time to sleep in case there is nothing to trim.

  • stream_name (String)

    The name of the stream that should be trimmed. Please note, that redstream adds a prefix to the redis keys. However, the stream_name param must be specified without any prefixes here. When using Redstream::Model, the stream name is the downcased, pluralized and underscored version of the model name. I.e., the stream name for a ‘User’ model will be ‘users’

  • logger (Logger) (defaults to: Logger.new("/dev/null"))

    A logger used for debug messages



37
38
39
40
41
42
43
# File 'lib/redstream/trimmer.rb', line 37

def initialize(interval:, stream_name:, consumer_names:, logger: Logger.new("/dev/null"))
  @interval = interval
  @stream_name = stream_name
  @consumer_names = consumer_names
  @logger = logger
  @lock = Lock.new(name: "trimmer:#{stream_name}")
end

Instance Method Details

#runObject

Loops and blocks forever trimming messages from the specified redis stream.



48
49
50
# File 'lib/redstream/trimmer.rb', line 48

def run
  loop { run_once }
end

#run_onceObject

Runs the trimming a single time. You usually want to use the #run method instead, which loops/blocks forever.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/redstream/trimmer.rb', line 55

def run_once
  got_lock = @lock.acquire do
    min_committed_id = Redstream.connection_pool.with do |redis|
      offset_key_names = @consumer_names.map do |consumer_name|
        Redstream.offset_key_name(stream_name: @stream_name, consumer_name: consumer_name)
      end

      redis.mget(offset_key_names).map(&:to_s).reject(&:empty?).min
    end

    return sleep(@interval) unless min_committed_id

    loop do
      messages = Redstream.connection_pool.with do |redis|
        redis.xrange(Redstream.stream_key_name(@stream_name), "-", min_committed_id, count: 1_000)
      end

      return sleep(@interval) if messages.nil? || messages.empty?

      Redstream.connection_pool.with { |redis| redis.xdel Redstream.stream_key_name(@stream_name), messages.map(&:first) }

      @logger.debug "Trimmed #{messages.size} messages from #{@stream_name}"
    end
  end

  sleep(5) unless got_lock
rescue StandardError => e
  @logger.error e

  sleep 5

  retry
end