Module: Redstream

Defined in:
lib/redstream.rb,
lib/redstream/lock.rb,
lib/redstream/model.rb,
lib/redstream/delayer.rb,
lib/redstream/message.rb,
lib/redstream/trimmer.rb,
lib/redstream/version.rb,
lib/redstream/consumer.rb,
lib/redstream/producer.rb

Defined Under Namespace

Modules: Model Classes: Consumer, Delayer, Lock, Message, Producer, Trimmer

Constant Summary collapse

VERSION =
"0.5.0"

Class Method Summary collapse

Class Method Details

.base_key_nameObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the full name namespace for redis keys.



140
141
142
# File 'lib/redstream.rb', line 140

def self.base_key_name
  [namespace, "redstream"].compact.join(":")
end

.connection_poolConnectionPool

Returns the connection pool instance or sets and creates a new connection pool in case no pool is yet created.

Returns:

  • (ConnectionPool)

    The connection pool



40
41
42
# File 'lib/redstream.rb', line 40

def self.connection_pool
  @connection_pool ||= ConnectionPool.new { Redis.new }
end

.connection_pool=(connection_pool) ⇒ Object

Redstream uses the connection_pool gem to pool redis connections. In case you have a distributed redis setup (sentinel/cluster) or the default pool size doesn’t match your requirements, then you must specify the connection pool. A connection pool is neccessary, because redstream is using blocking commands. Please note, redis connections are somewhat cheap, so you better specify the pool size to be large enough instead of running into bottlenecks.

Examples:

Redstream.connection_pool = ConnectionPool.new(size: 50) do
  Redis.new("...")
end


31
32
33
# File 'lib/redstream.rb', line 31

def self.connection_pool=(connection_pool)
  @connection_pool = connection_pool
end

.lock_key_name(name) ⇒ String

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Generates the redis key name used for locking.

Parameters:

  • name

    A high level name for the lock

Returns:

  • (String)

    A redis key name used for locking



132
133
134
# File 'lib/redstream.rb', line 132

def self.lock_key_name(name)
  "#{base_key_name}:lock:#{name}"
end

.max_consumer_id(stream_name:, consumer_name:) ⇒ String?

Returns the max committed id, i.e. the consumer’s offset, for the specified consumer name.

Parameters:

  • stream_name (String)

    the stream name

  • name (String)

    the consumer name

Returns:

  • (String, nil)

    The max committed offset, or nil



96
97
98
99
100
# File 'lib/redstream.rb', line 96

def self.max_consumer_id(stream_name:, consumer_name:)
  connection_pool.with do |redis|
    redis.get offset_key_name(stream_name: stream_name, consumer_name: consumer_name)
  end
end

.max_stream_id(stream_name) ⇒ String?

Returns the max id of the specified stream, i.e. the id of the last/newest message added. Returns nil for empty streams.

Parameters:

  • stream_name (String)

    The stream name

Returns:

  • (String, nil)

    The id of a stream’s newest messages, or nil



78
79
80
81
82
83
84
85
86
# File 'lib/redstream.rb', line 78

def self.max_stream_id(stream_name)
  connection_pool.with do |redis|
    message = redis.xrevrange(stream_key_name(stream_name), "+", "-", count: 1).first

    return nil unless message

    message[0]
  end
end

.namespaceObject

Returns the previously set namespace for redis keys to be used by Redstream.



57
58
59
# File 'lib/redstream.rb', line 57

def self.namespace
  @namespace
end

.namespace=(namespace) ⇒ Object

You can specify a namespace to use for redis keys. This is useful in case you are using a shared redis.

Examples:

Redstream.namespace = 'my_app'


50
51
52
# File 'lib/redstream.rb', line 50

def self.namespace=(namespace)
  @namespace = namespace
end

.offset_key_name(stream_name:, consumer_name:) ⇒ String

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Generates the redis key name used for storing a consumer’s current offset, i.e. the maximum id successfully processed.

Parameters:

  • consumer_name

    A high level consumer name

Returns:

  • (String)

    A redis key name for storing a stream’s current offset



121
122
123
# File 'lib/redstream.rb', line 121

def self.offset_key_name(stream_name:, consumer_name:)
  "#{base_key_name}:offset:#{stream_name}:#{consumer_name}"
end

.stream_key_name(stream_name) ⇒ String

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Generates the low level redis stream key name.

Parameters:

  • stream_name

    A high level stream name

Returns:

  • (String)

    A low level redis stream key name



109
110
111
# File 'lib/redstream.rb', line 109

def self.stream_key_name(stream_name)
  "#{base_key_name}:stream:#{stream_name}"
end

.stream_size(stream_name) ⇒ Integer

Returns the length of the specified stream.

Parameters:

  • stream_name (String)

    The stream name

Returns:

  • (Integer)

    The length of the stream



66
67
68
69
70
# File 'lib/redstream.rb', line 66

def self.stream_size(stream_name)
  connection_pool.with do |redis|
    redis.xlen(stream_key_name(stream_name))
  end
end