Class: Fake::Kafka::Consumer
- Inherits:
-
Object
- Object
- Fake::Kafka::Consumer
- Defined in:
- lib/fake/kafka/consumer.rb
Instance Method Summary collapse
- #each_batch(*options, &block) ⇒ Object
- #each_message(*options, &block) ⇒ Object
-
#initialize(kafka) ⇒ Consumer
constructor
A new instance of Consumer.
- #pause(topic, partition, timeout:, max_timeout: nil, exponential_backoff: false) ⇒ Object
- #stop ⇒ Object
- #subscribe(topic) ⇒ Object
Constructor Details
#initialize(kafka) ⇒ Consumer
Returns a new instance of Consumer.
2 3 4 5 |
# File 'lib/fake/kafka/consumer.rb', line 2 def initialize(kafka) @kafka = kafka @topics = [] end |
Instance Method Details
#each_batch(*options, &block) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/fake/kafka/consumer.rb', line 23 def each_batch(*, &block) begin batch = Fake::Kafka::Batch.new( topic: @kafka..first.topic, partition: @kafka..first.partition, messages: @kafka., highwater_mark_offset: @kafka..first.offset ) block.call(batch) rescue StandardError => e raise Kafka::ProcessingError.new(batch.topic, batch.partition, batch.highwater_mark_offset) end end |
#each_message(*options, &block) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/fake/kafka/consumer.rb', line 11 def (*, &block) @kafka..each do || next unless @topics.include?(.topic.to_sym) begin block.call() rescue StandardError => e raise Kafka::ProcessingError.new(.topic, .partition, .offset) end end end |
#pause(topic, partition, timeout:, max_timeout: nil, exponential_backoff: false) ⇒ Object
37 38 39 40 |
# File 'lib/fake/kafka/consumer.rb', line 37 def pause(topic, partition, timeout:, max_timeout: nil, exponential_backoff: false) @kafka.paused_partitions[topic] ||= {} @kafka.paused_partitions[topic][partition] = true end |
#stop ⇒ Object
42 |
# File 'lib/fake/kafka/consumer.rb', line 42 def stop; end |
#subscribe(topic) ⇒ Object
7 8 9 |
# File 'lib/fake/kafka/consumer.rb', line 7 def subscribe(topic, **) @topics << topic.to_sym end |