Class: Nagare::RedisStreams
- Inherits:
-
Object
- Object
- Nagare::RedisStreams
- Defined in:
- lib/nagare/redis_streams.rb
Overview
Abstraction layer for dealing with the basic RedisStreams X… commands for interacting with streams, groups and consumers.
This module may be mocked during testing if necessary, or replaced with an implementation using other technology, like kafka, AciveMQ or others.
Important: Groups are always assumed to be named ‘<stream>-<group>`.
Consumers are always created using the hostname + thread id
rubocop:disable Metrics/ClassLength
Class Method Summary collapse
-
.claim_next_stuck_message(stream_prefix, group) ⇒ Array[Hash]
Claums the next message of the consumer group that is stuck (pending and past min_idle_time since being picked up).
-
.connection ⇒ Redis
Returns a connection to redis.
-
.create_group(stream, group) ⇒ String
Creates a group in redis for the stream using xgroup.
-
.delete_group(stream, group) ⇒ String
Deletes a group in redis for the stream using xgroup.
-
.group_exists?(stream, group) ⇒ Boolean
Determines wether a group already exists in redis or not using xinfo.
-
.mark_processed(stream, group, message_id) ⇒ Integer
Acknowledges a message as processed in the consumer group.
-
.move_to_dlq(stream, group, message) ⇒ Object
Moves a message to the dead letter queue stream.
-
.pending(stream, group) ⇒ Hash
Query pending messages for a consumer group.
-
.publish(stream, event_name, data) ⇒ String
Publishes an eevent to the specified stream.
-
.read_next_messages(stream, group) ⇒ Object
Reads the next messages from the consumer group in redis.
-
.read_one(stream) ⇒ Array
Reads the last message on the stream without using a consumer group.
-
.retry_count(stream, group, message_id) ⇒ Object
Uses XPENDING to verify the number of times the message was delivered.
- .stream_name(stream) ⇒ Object
-
.truncate(stream) ⇒ Integer
Empties a stream for all readers, not only the consumer group.
Class Method Details
.claim_next_stuck_message(stream_prefix, group) ⇒ Array[Hash]
Claums the next message of the consumer group that is stuck (pending and past min_idle_time since being picked up)
rubocop:disable Metrics/MethodLength, Metrics/AbcSize
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/nagare/redis_streams.rb', line 92 def (stream_prefix, group) stream = stream_name(stream_prefix) result = connection.xautoclaim(stream, "#{stream}-#{group}", "#{hostname}-#{thread_id}", Nagare::Config.min_idle_time, '0-0', count: 1) # Move message to DLQ if retried too much and get next one if result['entries'].any? = result['entries'].first.first if retry_count(stream_prefix, group, ) > Nagare::Config.max_retries move_to_dlq(stream_prefix, group, result['entries'].first) return (stream_prefix, group) end end result['entries'] || [] end |
.connection ⇒ Redis
Returns a connection to redis. Currently not pooled
22 23 24 25 |
# File 'lib/nagare/redis_streams.rb', line 22 def connection # FIXME: Connection pool should come in handy @connection ||= Redis.new(url: Nagare::Config.redis_url) end |
.create_group(stream, group) ⇒ String
Creates a group in redis for the stream using xgroup
52 53 54 55 56 |
# File 'lib/nagare/redis_streams.rb', line 52 def create_group(stream, group) stream = stream_name(stream) connection.xgroup(:create, stream, "#{stream}-#{group}", '$', mkstream: true) end |
.delete_group(stream, group) ⇒ String
Deletes a group in redis for the stream using xgroup
65 66 67 68 |
# File 'lib/nagare/redis_streams.rb', line 65 def delete_group(stream, group) stream = stream_name(stream) connection.xgroup(:destroy, stream, "#{stream}-#{group}") end |
.group_exists?(stream, group) ⇒ Boolean
Determines wether a group already exists in redis or not using xinfo
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/nagare/redis_streams.rb', line 34 def group_exists?(stream, group) stream = stream_name(stream) info = connection.xinfo(:groups, stream.to_s) info.any? { |line| line['name'] == "#{stream}-#{group}" } rescue Redis::CommandError => e logger.info "Seems the group doesn't exist" logger.info e. logger.info e.backtrace.join("\n") false end |
.mark_processed(stream, group, message_id) ⇒ Integer
Acknowledges a message as processed in the consumer group
158 159 160 161 162 163 164 165 166 167 |
# File 'lib/nagare/redis_streams.rb', line 158 def mark_processed(stream, group, ) stream = stream_name(stream) group = "#{stream}-#{group}" count = connection.xack(stream, group, ) return if count == 1 raise "Message could not be ACKed in Redis: #{stream} #{group} "\ "#{}. Return value: #{count}" end |
.move_to_dlq(stream, group, message) ⇒ Object
Moves a message to the dead letter queue stream
132 133 134 135 136 137 |
# File 'lib/nagare/redis_streams.rb', line 132 def move_to_dlq(stream, group, ) Nagare.logger.warn "Moving message to DLQ #{} \ from stream #{stream}" publish(Nagare::Config.dlq_stream, stream, .to_json) mark_processed(stream, group, .first) end |
.pending(stream, group) ⇒ Hash
Query pending messages for a consumer group
}
208 209 210 211 212 |
# File 'lib/nagare/redis_streams.rb', line 208 def pending(stream, group) stream = stream_name(stream) group = "#{stream}-#{group}" connection.xpending(stream, group) end |
.publish(stream, event_name, data) ⇒ String
Publishes an eevent to the specified stream
78 79 80 81 |
# File 'lib/nagare/redis_streams.rb', line 78 def publish(stream, event_name, data) stream = stream_name(stream) connection.xadd(stream, { "#{event_name}": data }) end |
.read_next_messages(stream, group) ⇒ Object
Reads the next messages from the consumer group in redis.
143 144 145 146 147 148 |
# File 'lib/nagare/redis_streams.rb', line 143 def (stream, group) stream = stream_name(stream) result = connection.xreadgroup("#{stream}-#{group}", "#{hostname}-#{thread_id}", stream, '>') result[stream] || [] end |
.read_one(stream) ⇒ Array
Reads the last message on the stream without using a consumer group
175 176 177 178 179 |
# File 'lib/nagare/redis_streams.rb', line 175 def read_one(stream) stream = stream_name(stream) result = connection.xread(stream, [0], count: 1) result[stream]&.first end |
.retry_count(stream, group, message_id) ⇒ Object
Uses XPENDING to verify the number of times the message was delivered
118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/nagare/redis_streams.rb', line 118 def retry_count(stream, group, ) stream = stream_name(stream) result = connection.xpending(stream, "#{stream}-#{group}", , , 1) return 0 unless result.any? result.first['count'] end |
.stream_name(stream) ⇒ Object
190 191 192 193 194 195 196 197 |
# File 'lib/nagare/redis_streams.rb', line 190 def stream_name(stream) suffix = Nagare::Config.suffix if suffix.nil? stream else "#{stream}-#{suffix}" end end |
.truncate(stream) ⇒ Integer
Empties a stream for all readers, not only the consumer group
185 186 187 188 |
# File 'lib/nagare/redis_streams.rb', line 185 def truncate(stream) stream = stream_name(stream) connection.xtrim(stream, 0) end |