Class: RedisStream::Stream

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/redis_stream/stream.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:) ⇒ Stream

Returns a new instance of Stream.



7
8
9
10
# File 'lib/redis_stream/stream.rb', line 7

def initialize(name:)
  @name = name
  @values = []
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



5
6
7
# File 'lib/redis_stream/stream.rb', line 5

def name
  @name
end

Instance Method Details

#<<(value) ⇒ Object Also known as: push



12
13
14
15
# File 'lib/redis_stream/stream.rb', line 12

def <<(value)
  Redis.current.xadd(name, dump(value))
  self
end

#clearObject



18
19
20
# File 'lib/redis_stream/stream.rb', line 18

def clear
  Redis.current.xtrim(name, 0)
end

#each(&block) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/redis_stream/stream.rb', line 39

def each(&block)
  current_message_id = "0"

  while
    result = Redis.current.xread(name, current_message_id, count: 1)
    break if result.empty?

    message = result[name].first
    current_message_id, message_content = message
    block.call(load(message_content))
  end
end

#last(count = 1) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
# File 'lib/redis_stream/stream.rb', line 22

def last(count = 1)
  messages = Redis.current.xrevrange(name, '+', '-', count: count)
  messages.reverse!

  result = messages.map do |message|
    _id, content = message
    load(content)
  end

  count == 1 ? result.first : result
end

#lengthObject Also known as: size



34
35
36
# File 'lib/redis_stream/stream.rb', line 34

def length
  Redis.current.xlen(name)
end