Module: NulogyMessageBusConsumer::KafkaUtils

Defined in:
lib/nulogy_message_bus_consumer/kafka_utils.rb

Constant Summary collapse

BEGINNING_OFFSET =
0
END_OFFSET =
-1

Class Method Summary collapse

Class Method Details

.assigned_partitions(consumer) ⇒ Object



54
55
56
57
58
# File 'lib/nulogy_message_bus_consumer/kafka_utils.rb', line 54

def assigned_partitions(consumer)
  consumer.assignment.to_h
    .flat_map { |topic_name, partitions| [topic_name].product(partitions) }
    .map { |topic_name, partition| [topic_name, partition.partition] }
end

.every_message_until_none_are_left(consumer, timeout = 250) ⇒ Object



22
23
24
25
26
27
28
# File 'lib/nulogy_message_bus_consumer/kafka_utils.rb', line 22

def every_message_until_none_are_left(consumer, timeout = 250)
  Enumerator.new do |yielder|
    while (message = consumer.poll(timeout))
      yielder.yield(message)
    end
  end
end

.seek_beginning(consumer) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/nulogy_message_bus_consumer/kafka_utils.rb', line 30

def seek_beginning(consumer)
  wait_for_assignment(consumer)
  assigned_partitions(consumer).each do |topic_name, partition|
    message = Message.new(
      topic: topic_name,
      partition: partition,
      offset: BEGINNING_OFFSET
    )
    consumer.seek(message)
  end
end

.seek_ending(consumer) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/nulogy_message_bus_consumer/kafka_utils.rb', line 42

def seek_ending(consumer)
  wait_for_assignment(consumer)
  assigned_partitions(consumer).each do |topic_name, partition|
    message = Message.new(
      topic: topic_name,
      partition: partition,
      offset: END_OFFSET
    )
    consumer.seek(message)
  end
end

.wait_for(attempts: 1000, interval: 0.1) ⇒ Object



13
14
15
16
17
18
19
20
# File 'lib/nulogy_message_bus_consumer/kafka_utils.rb', line 13

def wait_for(attempts: 1000, interval: 0.1)
  attempts.times do
    return if yield

    sleep interval
  end
  raise "wait_for timed out!"
end

.wait_for_assignment(consumer) ⇒ Object



5
6
7
# File 'lib/nulogy_message_bus_consumer/kafka_utils.rb', line 5

def wait_for_assignment(consumer)
  wait_for { !consumer.assignment.empty? }
end

.wait_for_unassignment(consumer) ⇒ Object



9
10
11
# File 'lib/nulogy_message_bus_consumer/kafka_utils.rb', line 9

def wait_for_unassignment(consumer)
  wait_for { consumer.assignment.empty? }
end