Module: Redis::Commands::Streams
- Included in:
- Redis::Commands
- Defined in:
- lib/redis/commands/streams.rb
Instance Method Summary collapse
-
#xack(key, group, *ids) ⇒ Integer
Removes one or multiple entries from the pending entries list of a stream consumer group.
-
#xadd(key, entry, approximate: nil, maxlen: nil, nomkstream: nil, id: '*') ⇒ String
Add new entry to the stream.
-
#xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false) ⇒ Hash{String => Hash}, Array<String>
Transfers ownership of pending stream entries that match the specified criteria.
-
#xclaim(key, group, consumer, min_idle_time, *ids, **opts) ⇒ Hash{String => Hash}, Array<String>
Changes the ownership of a pending entry.
-
#xdel(key, *ids) ⇒ Integer
Delete entries by entry ids.
-
#xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false) ⇒ String, Integer
Manages the consumer group of the stream.
-
#xinfo(subcommand, key, group = nil) ⇒ Hash+
Returns the stream information each subcommand.
-
#xlen(key) ⇒ Integer
Returns the number of entries inside a stream.
-
#xpending(key, group, *args, idle: nil) ⇒ Hash+
Fetches not acknowledging pending entries.
-
#xrange(key, start = '-', range_end = '+', count: nil) ⇒ Array<Array<String, Hash>>
Fetches entries of the stream in ascending order.
-
#xread(keys, ids, count: nil, block: nil) ⇒ Hash{String => Hash{String => Hash}}
Fetches entries from one or multiple streams.
-
#xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: nil) ⇒ Hash{String => Hash{String => Hash}}
Fetches a subset of the entries from one or multiple streams related with the consumer group.
-
#xrevrange(key, range_end = '+', start = '-', count: nil) ⇒ Array<Array<String, Hash>>
Fetches entries of the stream in descending order.
-
#xtrim(key, len_or_id, strategy: 'MAXLEN', approximate: false, limit: nil) ⇒ Integer
Trims older entries of the stream if needed.
Instance Method Details
#xack(key, group, *ids) ⇒ Integer
Removes one or multiple entries from the pending entries list of a stream consumer group.
266 267 268 269 |
# File 'lib/redis/commands/streams.rb', line 266 def xack(key, group, *ids) args = [:xack, key, group].concat(ids.flatten) send_command(args) end |
#xadd(key, entry, approximate: nil, maxlen: nil, nomkstream: nil, id: '*') ⇒ String
Add new entry to the stream.
49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/redis/commands/streams.rb', line 49 def xadd(key, entry, approximate: nil, maxlen: nil, nomkstream: nil, id: '*') args = [:xadd, key] args << 'NOMKSTREAM' if nomkstream if maxlen args << "MAXLEN" args << "~" if approximate args << maxlen end args << id args.concat(entry.flatten) send_command(args) end |
#xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false) ⇒ Hash{String => Hash}, Array<String>
Transfers ownership of pending stream entries that match the specified criteria.
336 337 338 339 340 341 342 343 344 |
# File 'lib/redis/commands/streams.rb', line 336 def xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false) args = [:xautoclaim, key, group, consumer, min_idle_time, start] if count args << 'COUNT' << count.to_s end args << 'JUSTID' if justid blk = justid ? HashifyStreamAutoclaimJustId : HashifyStreamAutoclaim send_command(args, &blk) end |
#xclaim(key, group, consumer, min_idle_time, *ids, **opts) ⇒ Hash{String => Hash}, Array<String>
Changes the ownership of a pending entry
303 304 305 306 307 308 309 310 311 312 |
# File 'lib/redis/commands/streams.rb', line 303 def xclaim(key, group, consumer, min_idle_time, *ids, **opts) args = [:xclaim, key, group, consumer, min_idle_time].concat(ids.flatten) args.concat(['IDLE', opts[:idle].to_i]) if opts[:idle] args.concat(['TIME', opts[:time].to_i]) if opts[:time] args.concat(['RETRYCOUNT', opts[:retrycount]]) if opts[:retrycount] args << 'FORCE' if opts[:force] args << 'JUSTID' if opts[:justid] blk = opts[:justid] ? Noop : HashifyStreamEntries send_command(args, &blk) end |
#xdel(key, *ids) ⇒ Integer
Delete entries by entry ids.
106 107 108 109 |
# File 'lib/redis/commands/streams.rb', line 106 def xdel(key, *ids) args = [:xdel, key].concat(ids.flatten) send_command(args) end |
#xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false) ⇒ String, Integer
Manages the consumer group of the stream.
214 215 216 217 |
# File 'lib/redis/commands/streams.rb', line 214 def xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false) args = [:xgroup, subcommand, key, group, id_or_consumer, (mkstream ? 'MKSTREAM' : nil)].compact send_command(args) end |
#xinfo(subcommand, key, group = nil) ⇒ Hash+
Returns the stream information each subcommand.
22 23 24 25 26 27 28 29 30 |
# File 'lib/redis/commands/streams.rb', line 22 def xinfo(subcommand, key, group = nil) args = [:xinfo, subcommand, key, group].compact block = case subcommand.to_s.downcase when 'stream' then Hashify when 'groups', 'consumers' then proc { |r| r.map(&Hashify) } end send_command(args, &block) end |
#xlen(key) ⇒ Integer
Returns the number of entries inside a stream.
165 166 167 |
# File 'lib/redis/commands/streams.rb', line 165 def xlen(key) send_command([:xlen, key]) end |
#xpending(key, group, *args, idle: nil) ⇒ Hash+
Fetches not acknowledging pending entries
368 369 370 371 372 373 374 375 376 377 378 379 380 381 |
# File 'lib/redis/commands/streams.rb', line 368 def xpending(key, group, *args, idle: nil) command_args = [:xpending, key, group] command_args << 'IDLE' << Integer(idle) if idle case args.size when 0, 3, 4 command_args.concat(args) else raise ArgumentError, "wrong number of arguments (given #{args.size + 2}, expected 2, 5 or 6)" end summary_needed = args.empty? blk = summary_needed ? HashifyStreamPendings : HashifyStreamPendingDetails send_command(command_args, &blk) end |
#xrange(key, start = '-', range_end = '+', count: nil) ⇒ Array<Array<String, Hash>>
Fetches entries of the stream in ascending order.
128 129 130 131 132 |
# File 'lib/redis/commands/streams.rb', line 128 def xrange(key, start = '-', range_end = '+', count: nil) args = [:xrange, key, start, range_end] args.concat(['COUNT', count]) if count send_command(args, &HashifyStreamEntries) end |
#xread(keys, ids, count: nil, block: nil) ⇒ Hash{String => Hash{String => Hash}}
Fetches entries from one or multiple streams. Optionally blocking.
186 187 188 189 190 191 |
# File 'lib/redis/commands/streams.rb', line 186 def xread(keys, ids, count: nil, block: nil) args = [:xread] args << 'COUNT' << count if count args << 'BLOCK' << block.to_i if block _xread(args, keys, ids, block) end |
#xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: nil) ⇒ Hash{String => Hash{String => Hash}}
Fetches a subset of the entries from one or multiple streams related with the consumer group. Optionally blocking.
244 245 246 247 248 249 250 |
# File 'lib/redis/commands/streams.rb', line 244 def xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: nil) args = [:xreadgroup, 'GROUP', group, consumer] args << 'COUNT' << count if count args << 'BLOCK' << block.to_i if block args << 'NOACK' if noack _xread(args, keys, ids, block) end |
#xrevrange(key, range_end = '+', start = '-', count: nil) ⇒ Array<Array<String, Hash>>
Fetches entries of the stream in descending order.
151 152 153 154 155 |
# File 'lib/redis/commands/streams.rb', line 151 def xrevrange(key, range_end = '+', start = '-', count: nil) args = [:xrevrange, key, range_end, start] args.concat(['COUNT', count]) if count send_command(args, &HashifyStreamEntries) end |
#xtrim(key, maxlen, strategy: 'MAXLEN', approximate: true) ⇒ Integer #xtrim(key, minid, strategy: 'MINID', approximate: true) ⇒ Integer
Trims older entries of the stream if needed.
85 86 87 88 89 90 91 92 93 |
# File 'lib/redis/commands/streams.rb', line 85 def xtrim(key, len_or_id, strategy: 'MAXLEN', approximate: false, limit: nil) strategy = strategy.to_s.upcase args = [:xtrim, key, strategy] args << '~' if approximate args << len_or_id args.concat(['LIMIT', limit]) if limit send_command(args) end |