Class: Redis::Stream::Wrapper

Inherits:
Object
  • Object
show all
Defined in:
lib/redis/stream/wrapper.rb,
lib/redis/stream/wrapper/message.rb,
lib/redis/stream/wrapper/version.rb,
lib/redis/stream/wrapper/exceptions.rb

Defined Under Namespace

Classes: Message, StreamReadError

Constant Summary collapse

VERSION =
"0.1.2"

Instance Method Summary collapse

Constructor Details

#initialize(redis, read_timeout_ms = 1000) ⇒ Wrapper

Creates a new instance if a Stream.

Parameters:

  • redis
    • An instance of Redis.

  • read_timeout_ms (defaults to: 1000)
    • The read timeout granularity.



11
12
13
14
15
# File 'lib/redis/stream/wrapper.rb', line 11

def initialize(redis, read_timeout_ms = 1000)
  @redis = redis
  @listening = false
  @read_timeout_ms = read_timeout_ms
end

Instance Method Details

#ack_message(group, message) ⇒ Object

ACK stream message.

Parameters:

  • group
    • The group that ack

  • message
    • The message to add to the stream



78
79
80
# File 'lib/redis/stream/wrapper.rb', line 78

def ack_message(group, message)
  @redis.xack(message.stream, group, message.id)
end

#add_message(message) ⇒ Object

Adds a new message to the stream.

Parameters:

  • message
    • The message to add to the stream

Returns:

    • Message with new id (if default was used)



30
31
32
# File 'lib/redis/stream/wrapper.rb', line 30

def add_message(message)
  copy_message(message, @redis.xadd(message.stream, message.payload, id: message.id))
end

#clear_stream!(stream_name) ⇒ Object

Deletes the stream.

Parameters:

  • stream_name
    • The name of the stream to delete



21
22
23
# File 'lib/redis/stream/wrapper.rb', line 21

def clear_stream!(stream_name)
  @redis.del(stream_name)
end

#create_group(name, stream, start = '$', create_default_stream = true) ⇒ Object

Create group stream message.

Parameters:

  • name
    • The group name

  • stream
    • The concerned stream

  • start (defaults to: '$')
    • The start stream ($ is only new messages)

  • create_default_stream (defaults to: true)
    • Bool to create a stream if it does not exist



97
98
99
# File 'lib/redis/stream/wrapper.rb', line 97

def create_group(name, stream, start = '$', create_default_stream = true)
  @redis.xgroup(:create, stream, name, start, mkstream: create_default_stream)
end

#delete_group(name, stream) ⇒ Object

Delete group stream message.

Parameters:

  • name
    • The group name

  • stream
    • The concerned stream



116
117
118
# File 'lib/redis/stream/wrapper.rb', line 116

def delete_group(name, stream)
  @redis.xgroup(:destroy, stream, name)
end

#delete_group_consumer(name, stream, consumer) ⇒ Object

Delete group stream message.

Parameters:

  • name
    • The group name

  • stream
    • The concerned stream

  • consumer
    • The consumer name



126
127
128
# File 'lib/redis/stream/wrapper.rb', line 126

def delete_group_consumer(name, stream, consumer)
  @redis.xgroup(:delconsumer, stream, name, consumer)
end

#delete_message(message) ⇒ Object

Delete stream message.

Parameters:

  • message
    • The message to add to the stream



86
87
88
# File 'lib/redis/stream/wrapper.rb', line 86

def delete_message(message)
  @redis.xdel(message.stream, message.id)
end

#info(type, key, group = nil) ⇒ Object

Get info about streams / groups and consumers.

Parameters:

  • type
    • The type

  • key
    • The concerned stream / group name

  • group (defaults to: nil)
    • The group name for consumer type



107
108
109
# File 'lib/redis/stream/wrapper.rb', line 107

def info(type, key, group = nil)
  @redis.xinfo(type, key, group)
end

#listen(group, consumer_name, streams, opts = {}) ⇒ Object

Starts reading stream messages looping

Parameters:

  • group
    • A group to read stream

  • consumer_name
    • A consumer name

  • streams
    • A hash => ‘stream_begin_value’

  • opts (defaults to: {})
    • A hash of options

Raises:



41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/redis/stream/wrapper.rb', line 41

def listen(group, consumer_name, streams, opts = {})
  raise StreamReadError, "Already listening [#{stream}] stream" if @listening

  @listening = true
  opts[:block] = @read_timeout_ms if opts[:block].nil?
  while @listening
    results = @redis.xreadgroup(group, consumer_name, streams.keys, streams.values, opts)
    next unless results

    parse_read_response(results).each do |message|
      yield message
    end
  end
end

#read(group, consumer_name, streams, opts = {}) ⇒ Object

Starts reading stream messages.

Parameters:

  • group
    • A group to read stream

  • consumer_name
    • A consumer name

  • streams
    • A hash => ‘stream_begin_value’

  • opts (defaults to: {})
    • A hash of options



63
64
65
66
67
68
69
70
71
# File 'lib/redis/stream/wrapper.rb', line 63

def read(group, consumer_name, streams, opts = {})
  opts[:block] = @read_timeout_ms if opts[:block].nil?
  results = @redis.xreadgroup(group, consumer_name, streams.keys, streams.values, opts)
  return unless results

  parse_read_response(results).each.map do |message|
    message
  end
end

#stop_listeningObject

Stops reading message stream(s)



132
133
134
# File 'lib/redis/stream/wrapper.rb', line 132

def stop_listening
  @listening = false
end