Class: Redstream::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/redstream/consumer.rb

Overview

The Redstream::Consumer class to read messages from a specified redis stream in batches.

Examples:

Redstream::Consumer.new(name: "user_indexer", stream_name: "users").run do |messages|
  # ...
end

Instance Method Summary collapse

Constructor Details

#initialize(name:, stream_name:, batch_size: 1_000, logger: Logger.new("/dev/null")) ⇒ Consumer

Initializes a new consumer instance. Please note that you can have multiple consumers per stream, by specifying different names.

Parameters:

  • name (String)

    The consumer name. The name is used for locking

  • stream_name (String)

    The name of the redis stream. Please note that redstream adds a prefix to the redis keys. However, the stream_name param must be specified without any prefixes here. When using Redstream::Model, the stream name is the downcased, pluralized and underscored version of the model name. I.e., the stream name for a ‘User’ model will be ‘users’

  • batch_size (Fixnum) (defaults to: 1_000)

    The desired batch size, that is the number of messages yielded at max. More concretely, the number of messages yielded may be lower the batch_size, but not higher

  • logger (Logger) (defaults to: Logger.new("/dev/null"))

    The logger used for error logging



28
29
30
31
32
33
34
35
# File 'lib/redstream/consumer.rb', line 28

def initialize(name:, stream_name:, batch_size: 1_000, logger: Logger.new("/dev/null"))
  @name = name
  @stream_name = stream_name
  @batch_size = batch_size
  @logger = logger
  @redis = Redstream.connection_pool.with(&:dup)
  @lock = Lock.new(name: "consumer:#{@stream_name}:#{@name}")
end

Instance Method Details

#commit(offset) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Commits the specified offset/ID as the maximum ID already read, such that subsequent read calls will use this offset/ID as a starting point.

Parameters:

  • offset (String)

    The offset/ID to commit



109
110
111
# File 'lib/redstream/consumer.rb', line 109

def commit(offset)
  @redis.set Redstream.offset_key_name(stream_name: @stream_name, consumer_name: @name), offset
end

#max_committed_idString?

Returns its maximum committed id, i.e. the consumer’s offset.

Returns:

  • (String, nil)

    The committed id, or nil



41
42
43
# File 'lib/redstream/consumer.rb', line 41

def max_committed_id
  @redis.get Redstream.offset_key_name(stream_name: @stream_name, consumer_name: @name)
end

#run(&block) ⇒ Object

Loops and thus blocks forever while reading messages from the specified stream and yielding them in batches.

Examples:

consumer.run do |messages|
  # ...
end


53
54
55
# File 'lib/redstream/consumer.rb', line 53

def run(&block)
  loop { run_once(&block) }
end

#run_once(&block) ⇒ Object

Reads a single batch from the specified stream and yields it. You usually want to use the #run method instead, which loops/blocks forever.

Examples:

consumer.run_once do |messages|
  # ...
end


65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/redstream/consumer.rb', line 65

def run_once(&block)
  got_lock = @lock.acquire do
    offset = @redis.get(Redstream.offset_key_name(stream_name: @stream_name, consumer_name: @name))
    offset ||= "0-0"

    stream_key_name = Redstream.stream_key_name(@stream_name)

    response = begin
      @redis.xread(stream_key_name, offset, count: @batch_size, block: 5_000)
    rescue Redis::TimeoutError
      nil
    end

    return if response.nil? || response[stream_key_name].nil? || response[stream_key_name].empty?

    messages = response[stream_key_name].map do |raw_message|
      Message.new(raw_message)
    end

    block.call(messages)

    offset = response[stream_key_name].last[0]

    return unless offset

    commit offset
  end

  sleep(5) unless got_lock
rescue StandardError => e
  @logger.error e

  sleep 5

  retry
end