Class: Fake::Kafka::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/fake/kafka/consumer.rb

Instance Method Summary collapse

Constructor Details

#initialize(kafka) ⇒ Consumer

Returns a new instance of Consumer.



2
3
4
5
# File 'lib/fake/kafka/consumer.rb', line 2

def initialize(kafka)
  @kafka = kafka
  @topics = []
end

Instance Method Details

#each_batch(*options, &block) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/fake/kafka/consumer.rb', line 23

def each_batch(*options, &block)
  begin
    batch = Fake::Kafka::Batch.new(
      topic: @kafka.messages.first.topic,
      partition: @kafka.messages.first.partition,
      messages: @kafka.messages,
      highwater_mark_offset: @kafka.messages.first.offset
    )
    block.call(batch)
  rescue StandardError => e
    raise Kafka::ProcessingError.new(batch.topic, batch.partition, batch.highwater_mark_offset)
  end
end

#each_message(*options, &block) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
# File 'lib/fake/kafka/consumer.rb', line 11

def each_message(*options, &block)
  @kafka.messages.each do |message|
    next unless @topics.include?(message.topic.to_sym)

    begin
      block.call(message)
    rescue StandardError => e
      raise Kafka::ProcessingError.new(message.topic, message.partition, message.offset)
    end
  end
end

#pause(topic, partition, timeout:, max_timeout: nil, exponential_backoff: false) ⇒ Object



37
38
39
40
# File 'lib/fake/kafka/consumer.rb', line 37

def pause(topic, partition, timeout:, max_timeout: nil, exponential_backoff: false)
  @kafka.paused_partitions[topic] ||= {}
  @kafka.paused_partitions[topic][partition] = true
end

#stopObject



42
# File 'lib/fake/kafka/consumer.rb', line 42

def stop; end

#subscribe(topic) ⇒ Object



7
8
9
# File 'lib/fake/kafka/consumer.rb', line 7

def subscribe(topic, **)
  @topics << topic.to_sym
end