Class: Karafka::Connection::MessagesBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/connection/messages_buffer.rb

Overview

Note:

This buffer is NOT thread safe. We do not worry about it as we do not use it outside of the main listener loop. It can be cleared after the jobs are scheduled with messages it stores, because messages arrays are not “cleared” in any way directly and their reference stays.

Buffer used to build and store karafka messages built based on raw librdkafka messages.

Why do we have two buffers? ‘RawMessagesBuffer` is used to store raw messages and to handle

cases related to partition revocation and reconnections. It is "internal" to the listening
process. `MessagesBuffer` on the other hand is used to "translate" those raw messages that
we know that are ok into Karafka messages and to simplify further work with them.

While it adds a bit of overhead, it makes conceptual things much easier and it adds only two

simple hash iterations over messages batch.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subscription_group) ⇒ MessagesBuffer

Returns a new instance of MessagesBuffer.

Parameters:



23
24
25
26
27
28
29
30
31
# File 'lib/karafka/connection/messages_buffer.rb', line 23

def initialize(subscription_group)
  @subscription_group = subscription_group
  @size = 0
  @groups = Hash.new do |topic_groups, topic|
    topic_groups[topic] = Hash.new do |partition_groups, partition|
      partition_groups[partition] = []
    end
  end
end

Instance Attribute Details

#sizeObject (readonly)

Returns the value of attribute size.



20
21
22
# File 'lib/karafka/connection/messages_buffer.rb', line 20

def size
  @size
end

Instance Method Details

#each {|topic, partition, messages| ... } ⇒ Object

Allows to iterate over all the topics and partitions messages

Yield Parameters:

  • topic (String)

    name

  • partition (Integer)

    number

  • messages (Array<Karafka::Messages::Message>)

    from a given topic partition



62
63
64
65
66
67
68
# File 'lib/karafka/connection/messages_buffer.rb', line 62

def each
  @groups.each do |topic, partitions|
    partitions.each do |partition, messages|
      yield(topic, partition, messages)
    end
  end
end

#empty?Boolean

Returns is the buffer empty or does it contain any messages.

Returns:

  • (Boolean)

    is the buffer empty or does it contain any messages



71
72
73
# File 'lib/karafka/connection/messages_buffer.rb', line 71

def empty?
  @size.zero?
end

#remap(raw_messages_buffer) ⇒ Object

Remaps raw messages from the raw messages buffer to Karafka messages

Parameters:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/karafka/connection/messages_buffer.rb', line 35

def remap(raw_messages_buffer)
  clear unless @size.zero?

  # Since it happens "right after" we've received the messages, it is close enough it time
  # to be used as the moment we received messages.
  received_at = Time.now

  raw_messages_buffer.each do |topic, partition, messages|
    @size += messages.count

    ktopic = @subscription_group.topics.find(topic)

    @groups[topic][partition] = messages.map do |message|
      Messages::Builders::Message.call(
        message,
        ktopic,
        received_at
      )
    end
  end
end