Module: Redstream
- Defined in:
- lib/redstream.rb,
lib/redstream/lock.rb,
lib/redstream/model.rb,
lib/redstream/delayer.rb,
lib/redstream/message.rb,
lib/redstream/trimmer.rb,
lib/redstream/version.rb,
lib/redstream/consumer.rb,
lib/redstream/producer.rb
Defined Under Namespace
Modules: Model Classes: Consumer, Delayer, Lock, Message, Producer, Trimmer
Constant Summary collapse
- VERSION =
"0.5.0"
Class Method Summary collapse
-
.base_key_name ⇒ Object
private
Returns the full name namespace for redis keys.
-
.connection_pool ⇒ ConnectionPool
Returns the connection pool instance or sets and creates a new connection pool in case no pool is yet created.
-
.connection_pool=(connection_pool) ⇒ Object
Redstream uses the connection_pool gem to pool redis connections.
-
.lock_key_name(name) ⇒ String
private
Generates the redis key name used for locking.
-
.max_consumer_id(stream_name:, consumer_name:) ⇒ String?
Returns the max committed id, i.e.
-
.max_stream_id(stream_name) ⇒ String?
Returns the max id of the specified stream, i.e.
-
.namespace ⇒ Object
Returns the previously set namespace for redis keys to be used by Redstream.
-
.namespace=(namespace) ⇒ Object
You can specify a namespace to use for redis keys.
-
.offset_key_name(stream_name:, consumer_name:) ⇒ String
private
Generates the redis key name used for storing a consumer’s current offset, i.e.
-
.stream_key_name(stream_name) ⇒ String
private
Generates the low level redis stream key name.
-
.stream_size(stream_name) ⇒ Integer
Returns the length of the specified stream.
Class Method Details
.base_key_name ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the full name namespace for redis keys.
140 141 142 |
# File 'lib/redstream.rb', line 140 def self.base_key_name [namespace, "redstream"].compact.join(":") end |
.connection_pool ⇒ ConnectionPool
Returns the connection pool instance or sets and creates a new connection pool in case no pool is yet created.
40 41 42 |
# File 'lib/redstream.rb', line 40 def self.connection_pool @connection_pool ||= ConnectionPool.new { Redis.new } end |
.connection_pool=(connection_pool) ⇒ Object
Redstream uses the connection_pool gem to pool redis connections. In case you have a distributed redis setup (sentinel/cluster) or the default pool size doesn’t match your requirements, then you must specify the connection pool. A connection pool is neccessary, because redstream is using blocking commands. Please note, redis connections are somewhat cheap, so you better specify the pool size to be large enough instead of running into bottlenecks.
31 32 33 |
# File 'lib/redstream.rb', line 31 def self.connection_pool=(connection_pool) @connection_pool = connection_pool end |
.lock_key_name(name) ⇒ String
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Generates the redis key name used for locking.
132 133 134 |
# File 'lib/redstream.rb', line 132 def self.lock_key_name(name) "#{base_key_name}:lock:#{name}" end |
.max_consumer_id(stream_name:, consumer_name:) ⇒ String?
Returns the max committed id, i.e. the consumer’s offset, for the specified consumer name.
96 97 98 99 100 |
# File 'lib/redstream.rb', line 96 def self.max_consumer_id(stream_name:, consumer_name:) connection_pool.with do |redis| redis.get offset_key_name(stream_name: stream_name, consumer_name: consumer_name) end end |
.max_stream_id(stream_name) ⇒ String?
Returns the max id of the specified stream, i.e. the id of the last/newest message added. Returns nil for empty streams.
78 79 80 81 82 83 84 85 86 |
# File 'lib/redstream.rb', line 78 def self.max_stream_id(stream_name) connection_pool.with do |redis| = redis.xrevrange(stream_key_name(stream_name), "+", "-", count: 1).first return nil unless [0] end end |
.namespace ⇒ Object
Returns the previously set namespace for redis keys to be used by Redstream.
57 58 59 |
# File 'lib/redstream.rb', line 57 def self.namespace @namespace end |
.namespace=(namespace) ⇒ Object
You can specify a namespace to use for redis keys. This is useful in case you are using a shared redis.
50 51 52 |
# File 'lib/redstream.rb', line 50 def self.namespace=(namespace) @namespace = namespace end |
.offset_key_name(stream_name:, consumer_name:) ⇒ String
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Generates the redis key name used for storing a consumer’s current offset, i.e. the maximum id successfully processed.
121 122 123 |
# File 'lib/redstream.rb', line 121 def self.offset_key_name(stream_name:, consumer_name:) "#{base_key_name}:offset:#{stream_name}:#{consumer_name}" end |
.stream_key_name(stream_name) ⇒ String
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Generates the low level redis stream key name.
109 110 111 |
# File 'lib/redstream.rb', line 109 def self.stream_key_name(stream_name) "#{base_key_name}:stream:#{stream_name}" end |
.stream_size(stream_name) ⇒ Integer
Returns the length of the specified stream.
66 67 68 69 70 |
# File 'lib/redstream.rb', line 66 def self.stream_size(stream_name) connection_pool.with do |redis| redis.xlen(stream_key_name(stream_name)) end end |