Module: NulogyMessageBusConsumer::KafkaUtils
- Defined in:
- lib/nulogy_message_bus_consumer/kafka_utils.rb
Constant Summary collapse
- BEGINNING_OFFSET =
0
- END_OFFSET =
-1
Class Method Summary collapse
- .assigned_partitions(consumer) ⇒ Object
- .every_message_until_none_are_left(consumer, timeout = 250) ⇒ Object
- .seek_beginning(consumer) ⇒ Object
- .seek_ending(consumer) ⇒ Object
- .wait_for(attempts: 1000, interval: 0.1) ⇒ Object
- .wait_for_assignment(consumer) ⇒ Object
- .wait_for_unassignment(consumer) ⇒ Object
Class Method Details
.assigned_partitions(consumer) ⇒ Object
54 55 56 57 58 |
# File 'lib/nulogy_message_bus_consumer/kafka_utils.rb', line 54 def assigned_partitions(consumer) consumer.assignment.to_h .flat_map { |topic_name, partitions| [topic_name].product(partitions) } .map { |topic_name, partition| [topic_name, partition.partition] } end |
.every_message_until_none_are_left(consumer, timeout = 250) ⇒ Object
22 23 24 25 26 27 28 |
# File 'lib/nulogy_message_bus_consumer/kafka_utils.rb', line 22 def (consumer, timeout = 250) Enumerator.new do |yielder| while ( = consumer.poll(timeout)) yielder.yield() end end end |
.seek_beginning(consumer) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/nulogy_message_bus_consumer/kafka_utils.rb', line 30 def seek_beginning(consumer) wait_for_assignment(consumer) assigned_partitions(consumer).each do |topic_name, partition| = Message.new( topic: topic_name, partition: partition, offset: BEGINNING_OFFSET ) consumer.seek() end end |
.seek_ending(consumer) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/nulogy_message_bus_consumer/kafka_utils.rb', line 42 def seek_ending(consumer) wait_for_assignment(consumer) assigned_partitions(consumer).each do |topic_name, partition| = Message.new( topic: topic_name, partition: partition, offset: END_OFFSET ) consumer.seek() end end |
.wait_for(attempts: 1000, interval: 0.1) ⇒ Object
13 14 15 16 17 18 19 20 |
# File 'lib/nulogy_message_bus_consumer/kafka_utils.rb', line 13 def wait_for(attempts: 1000, interval: 0.1) attempts.times do return if yield sleep interval end raise "wait_for timed out!" end |
.wait_for_assignment(consumer) ⇒ Object
5 6 7 |
# File 'lib/nulogy_message_bus_consumer/kafka_utils.rb', line 5 def wait_for_assignment(consumer) wait_for { !consumer.assignment.empty? } end |
.wait_for_unassignment(consumer) ⇒ Object
9 10 11 |
# File 'lib/nulogy_message_bus_consumer/kafka_utils.rb', line 9 def wait_for_unassignment(consumer) wait_for { consumer.assignment.empty? } end |