Class: Redis::Stream::Wrapper
- Inherits:
-
Object
- Object
- Redis::Stream::Wrapper
- Defined in:
- lib/redis/stream/wrapper.rb,
lib/redis/stream/wrapper/message.rb,
lib/redis/stream/wrapper/version.rb,
lib/redis/stream/wrapper/exceptions.rb
Defined Under Namespace
Classes: Message, StreamReadError
Constant Summary collapse
- VERSION =
"0.1.2"
Instance Method Summary collapse
-
#ack_message(group, message) ⇒ Object
ACK stream message.
-
#add_message(message) ⇒ Object
Adds a new message to the stream.
-
#clear_stream!(stream_name) ⇒ Object
Deletes the stream.
-
#create_group(name, stream, start = '$', create_default_stream = true) ⇒ Object
Create group stream message.
-
#delete_group(name, stream) ⇒ Object
Delete group stream message.
-
#delete_group_consumer(name, stream, consumer) ⇒ Object
Delete group stream message.
-
#delete_message(message) ⇒ Object
Delete stream message.
-
#info(type, key, group = nil) ⇒ Object
Get info about streams / groups and consumers.
-
#initialize(redis, read_timeout_ms = 1000) ⇒ Wrapper
constructor
Creates a new instance if a Stream.
-
#listen(group, consumer_name, streams, opts = {}) ⇒ Object
Starts reading stream messages looping.
-
#read(group, consumer_name, streams, opts = {}) ⇒ Object
Starts reading stream messages.
-
#stop_listening ⇒ Object
Stops reading message stream(s).
Constructor Details
#initialize(redis, read_timeout_ms = 1000) ⇒ Wrapper
Creates a new instance if a Stream.
11 12 13 14 15 |
# File 'lib/redis/stream/wrapper.rb', line 11 def initialize(redis, read_timeout_ms = 1000) @redis = redis @listening = false @read_timeout_ms = read_timeout_ms end |
Instance Method Details
#ack_message(group, message) ⇒ Object
ACK stream message.
78 79 80 |
# File 'lib/redis/stream/wrapper.rb', line 78 def (group, ) @redis.xack(.stream, group, .id) end |
#add_message(message) ⇒ Object
Adds a new message to the stream.
30 31 32 |
# File 'lib/redis/stream/wrapper.rb', line 30 def () (, @redis.xadd(.stream, .payload, id: .id)) end |
#clear_stream!(stream_name) ⇒ Object
Deletes the stream.
21 22 23 |
# File 'lib/redis/stream/wrapper.rb', line 21 def clear_stream!(stream_name) @redis.del(stream_name) end |
#create_group(name, stream, start = '$', create_default_stream = true) ⇒ Object
Create group stream message.
97 98 99 |
# File 'lib/redis/stream/wrapper.rb', line 97 def create_group(name, stream, start = '$', create_default_stream = true) @redis.xgroup(:create, stream, name, start, mkstream: create_default_stream) end |
#delete_group(name, stream) ⇒ Object
Delete group stream message.
116 117 118 |
# File 'lib/redis/stream/wrapper.rb', line 116 def delete_group(name, stream) @redis.xgroup(:destroy, stream, name) end |
#delete_group_consumer(name, stream, consumer) ⇒ Object
Delete group stream message.
126 127 128 |
# File 'lib/redis/stream/wrapper.rb', line 126 def delete_group_consumer(name, stream, consumer) @redis.xgroup(:delconsumer, stream, name, consumer) end |
#delete_message(message) ⇒ Object
Delete stream message.
86 87 88 |
# File 'lib/redis/stream/wrapper.rb', line 86 def () @redis.xdel(.stream, .id) end |
#info(type, key, group = nil) ⇒ Object
Get info about streams / groups and consumers.
107 108 109 |
# File 'lib/redis/stream/wrapper.rb', line 107 def info(type, key, group = nil) @redis.xinfo(type, key, group) end |
#listen(group, consumer_name, streams, opts = {}) ⇒ Object
Starts reading stream messages looping
41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/redis/stream/wrapper.rb', line 41 def listen(group, consumer_name, streams, opts = {}) raise StreamReadError, "Already listening [#{stream}] stream" if @listening @listening = true opts[:block] = @read_timeout_ms if opts[:block].nil? while @listening results = @redis.xreadgroup(group, consumer_name, streams.keys, streams.values, opts) next unless results parse_read_response(results).each do || yield end end end |
#read(group, consumer_name, streams, opts = {}) ⇒ Object
Starts reading stream messages.
63 64 65 66 67 68 69 70 71 |
# File 'lib/redis/stream/wrapper.rb', line 63 def read(group, consumer_name, streams, opts = {}) opts[:block] = @read_timeout_ms if opts[:block].nil? results = @redis.xreadgroup(group, consumer_name, streams.keys, streams.values, opts) return unless results parse_read_response(results).each.map do || end end |
#stop_listening ⇒ Object
Stops reading message stream(s)
132 133 134 |
# File 'lib/redis/stream/wrapper.rb', line 132 def stop_listening @listening = false end |