Class: RedisStream::Consumer

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/redis_stream/consumer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, group:, stream:) ⇒ Consumer

Returns a new instance of Consumer.



7
8
9
10
11
# File 'lib/redis_stream/consumer.rb', line 7

def initialize(name:, group:, stream:)
  @name = name
  @group = group
  @stream = stream
end

Instance Attribute Details

#groupObject (readonly)

Returns the value of attribute group.



5
6
7
# File 'lib/redis_stream/consumer.rb', line 5

def group
  @group
end

#nameObject (readonly)

Returns the value of attribute name.



5
6
7
# File 'lib/redis_stream/consumer.rb', line 5

def name
  @name
end

#streamObject (readonly)

Returns the value of attribute stream.



5
6
7
# File 'lib/redis_stream/consumer.rb', line 5

def stream
  @stream
end

Instance Method Details

#each(&block) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
# File 'lib/redis_stream/consumer.rb', line 13

def each(&block)
  while
    result = read_next
    break if result.empty?

    message = result[stream.name].first
    id, content = message
    block.call(load(content))
    Redis.current.xack(stream.name, name, id)
  end
end