Class: Nagare::RedisStreams

Inherits:
Object
  • Object
show all
Defined in:
lib/nagare/redis_streams.rb

Overview

Abstraction layer for dealing with the basic RedisStreams X… commands for interacting with streams, groups and consumers.

This module may be mocked during testing if necessary, or replaced with an implementation using other technology, like kafka, AciveMQ or others.

Important: Groups are always assumed to be named ‘<stream>-<group>`.

Consumers are always created using the hostname + thread id

rubocop:disable Metrics/ClassLength

Class Method Summary collapse

Class Method Details

.claim_next_stuck_message(stream_prefix, group) ⇒ Array[Hash]

Claums the next message of the consumer group that is stuck (pending and past min_idle_time since being picked up)

rubocop:disable Metrics/MethodLength, Metrics/AbcSize

Parameters:

  • stream_prefix (String)

    name of the stream

  • group (String)

    name of the consumer group

Returns:

  • (Array[Hash])

    array containing the 1 message or empty



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/nagare/redis_streams.rb', line 92

def claim_next_stuck_message(stream_prefix, group)
  stream = stream_name(stream_prefix)
  result = connection.xautoclaim(stream,
                                 "#{stream}-#{group}",
                                 "#{hostname}-#{thread_id}",
                                 Nagare::Config.min_idle_time,
                                 '0-0',
                                 count: 1)

  # Move message to DLQ if retried too much and get next one
  if result['entries'].any?
    message_id = result['entries'].first.first
    if retry_count(stream_prefix, group,
                   message_id) > Nagare::Config.max_retries
      move_to_dlq(stream_prefix, group, result['entries'].first)
      return claim_next_stuck_message(stream_prefix, group)
    end
  end

  result['entries'] || []
end

.connectionRedis

Returns a connection to redis. Currently not pooled

Returns:

  • (Redis)

    a connection to redis from the redis-rb gem



22
23
24
25
# File 'lib/nagare/redis_streams.rb', line 22

def connection
  # FIXME: Connection pool should come in handy
  @connection ||= Redis.new(url: Nagare::Config.redis_url)
end

.create_group(stream, group) ⇒ String

Creates a group in redis for the stream using xgroup

Parameters:

  • stream (String)

    name of the stream

  • group (String)

    name of the group

Returns:

  • (String)

    OK



52
53
54
55
56
# File 'lib/nagare/redis_streams.rb', line 52

def create_group(stream, group)
  stream = stream_name(stream)
  connection.xgroup(:create, stream, "#{stream}-#{group}", '$',
                    mkstream: true)
end

.delete_group(stream, group) ⇒ String

Deletes a group in redis for the stream using xgroup

Parameters:

  • stream (String)

    name of the stream

  • group (String)

    name of the group

Returns:

  • (String)

    OK



65
66
67
68
# File 'lib/nagare/redis_streams.rb', line 65

def delete_group(stream, group)
  stream = stream_name(stream)
  connection.xgroup(:destroy, stream, "#{stream}-#{group}")
end

.group_exists?(stream, group) ⇒ Boolean

Determines wether a group already exists in redis or not using xinfo

Parameters:

  • stream (String)

    name of the stream

  • group (String)

    name of the group

Returns:

  • (Boolean)

    true if the group exists, otherwise false



34
35
36
37
38
39
40
41
42
43
# File 'lib/nagare/redis_streams.rb', line 34

def group_exists?(stream, group)
  stream = stream_name(stream)
  info = connection.xinfo(:groups, stream.to_s)
  info.any? { |line| line['name'] == "#{stream}-#{group}" }
rescue Redis::CommandError => e
  logger.info "Seems the group doesn't exist"
  logger.info e.message
  logger.info e.backtrace.join("\n")
  false
end

.mark_processed(stream, group, message_id) ⇒ Integer

Acknowledges a message as processed in the consumer group

Parameters:

  • stream (String)

    name of the stream

  • group (String)

    name of the group

  • message_id (String)

    the id of the message

Returns:

  • (Integer)

    number of messages processed



158
159
160
161
162
163
164
165
166
167
# File 'lib/nagare/redis_streams.rb', line 158

def mark_processed(stream, group, message_id)
  stream = stream_name(stream)
  group = "#{stream}-#{group}"

  count = connection.xack(stream, group, message_id)
  return if count == 1

  raise "Message could not be ACKed in Redis: #{stream} #{group} "\
    "#{message_id}. Return value: #{count}"
end

.move_to_dlq(stream, group, message) ⇒ Object

Moves a message to the dead letter queue stream



132
133
134
135
136
137
# File 'lib/nagare/redis_streams.rb', line 132

def move_to_dlq(stream, group, message)
  Nagare.logger.warn "Moving message to DLQ #{message} \
                      from stream #{stream}"
  publish(Nagare::Config.dlq_stream, stream, message.to_json)
  mark_processed(stream, group, message.first)
end

.pending(stream, group) ⇒ Hash

Query pending messages for a consumer group

}

Returns:

  • (Hash)

    { “size”=>0, “min_entry_id”=>nil, “max_entry_id”=>nil, “consumers”=>{}



208
209
210
211
212
# File 'lib/nagare/redis_streams.rb', line 208

def pending(stream, group)
  stream = stream_name(stream)
  group = "#{stream}-#{group}"
  connection.xpending(stream, group)
end

.publish(stream, event_name, data) ⇒ String

Publishes an eevent to the specified stream

Parameters:

  • stream (String)

    name of the stream

  • event_name (String)

    key of the event

  • data (String)

    data for the event, usually in JSON format.

Returns:

  • (String)

    message id



78
79
80
81
# File 'lib/nagare/redis_streams.rb', line 78

def publish(stream, event_name, data)
  stream = stream_name(stream)
  connection.xadd(stream, { "#{event_name}": data })
end

.read_next_messages(stream, group) ⇒ Object

Reads the next messages from the consumer group in redis.



143
144
145
146
147
148
# File 'lib/nagare/redis_streams.rb', line 143

def read_next_messages(stream, group)
  stream = stream_name(stream)
  result = connection.xreadgroup("#{stream}-#{group}",
                                 "#{hostname}-#{thread_id}", stream, '>')
  result[stream] || []
end

.read_one(stream) ⇒ Array

Reads the last message on the stream without using a consumer group

Parameters:

  • stream (String)

    name of the stream

Returns:

  • (Array)

    tuple of [message-id, event]



175
176
177
178
179
# File 'lib/nagare/redis_streams.rb', line 175

def read_one(stream)
  stream = stream_name(stream)
  result = connection.xread(stream, [0], count: 1)
  result[stream]&.first
end

.retry_count(stream, group, message_id) ⇒ Object

Uses XPENDING to verify the number of times the message was delivered



118
119
120
121
122
123
124
125
126
127
128
# File 'lib/nagare/redis_streams.rb', line 118

def retry_count(stream, group, message_id)
  stream = stream_name(stream)
  result = connection.xpending(stream,
                               "#{stream}-#{group}",
                               message_id,
                               message_id,
                               1)
  return 0 unless result.any?

  result.first['count']
end

.stream_name(stream) ⇒ Object



190
191
192
193
194
195
196
197
# File 'lib/nagare/redis_streams.rb', line 190

def stream_name(stream)
  suffix = Nagare::Config.suffix
  if suffix.nil?
    stream
  else
    "#{stream}-#{suffix}"
  end
end

.truncate(stream) ⇒ Integer

Empties a stream for all readers, not only the consumer group

Returns:

  • (Integer)

    the number of entries actually deleted



185
186
187
188
# File 'lib/nagare/redis_streams.rb', line 185

def truncate(stream)
  stream = stream_name(stream)
  connection.xtrim(stream, 0)
end