Class: Karafka::Connection::RawMessagesBuffer

Inherits:
Object
  • Object
show all
Includes:
Karafka::Core::Helpers::Time
Defined in:
lib/karafka/connection/raw_messages_buffer.rb

Overview

Note:

This buffer is NOT threadsafe.

Note:

We store data here in groups per topic partition to handle the revocation case, where we may need to remove messages from a single topic partition.

Buffer for raw librdkafka messages and eof status.

When message is added to this buffer, it gets assigned to an array with other messages from the same topic and partition.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeKarafka::Connection::MessagesBuffer

Returns buffer instance.



23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 23

def initialize
  @size = 0
  @last_polled_at = monotonic_now

  @groups = Hash.new do |topic_groups, topic|
    topic_groups[topic] = Hash.new do |partition_groups, partition|
      partition_groups[partition] = {
        eof: false,
        messages: []
      }
    end
  end
end

Instance Attribute Details

#last_polled_atFloat (readonly)

Returns last polling time in milliseconds (monotonic).

Returns:

  • (Float)

    last polling time in milliseconds (monotonic)



20
21
22
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 20

def last_polled_at
  @last_polled_at
end

#sizeObject (readonly)

Returns the value of attribute size.



17
18
19
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 17

def size
  @size
end

Instance Method Details

#<<(message) ⇒ Array<Rdkafka::Consumer::Message>

Adds a message to the buffer.

Parameters:

  • message (Rdkafka::Consumer::Message)

    raw rdkafka message

Returns:

  • (Array<Rdkafka::Consumer::Message>)

    given partition topic sub-buffer array



41
42
43
44
45
46
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 41

def <<(message)
  @size += 1
  partition_state = @groups[message.topic][message.partition]
  partition_state[:messages] << message
  partition_state[:eof] = false
end

#clearObject

Note:

We do not clear the whole groups hash but rather we clear the partition hashes, so we save ourselves some objects allocations. We cannot clear the underlying arrays as they may be used in other threads for data processing, thus if we would clear it, we could potentially clear a raw messages array for a job that is in the jobs queue.

Note:

We do not clear the eof assignments because they can span across batch pollings. Since eof is not raised non-stop and is silenced after an eof poll, if we would clean it here we would loose the notion of it. The reset state for it should happen when we do discover new messages for given topic partition.

Removes all the data from the buffer.



116
117
118
119
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 116

def clear
  @size = 0
  @groups.each_value(&:clear)
end

#delete(topic, partition) ⇒ Object

Removes given topic and partition data out of the buffer This is used when there’s a partition revocation

Parameters:

  • topic (String)

    topic we’re interested in

  • partition (Integer)

    partition of which data we want to remove



78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 78

def delete(topic, partition)
  return unless @groups.key?(topic)
  return unless @groups.fetch(topic).key?(partition)

  topic_data = @groups.fetch(topic)
  topic_data.delete(partition)

  recount!

  # If there are no more partitions to handle in a given topic, remove it completely
  @groups.delete(topic) if topic_data.empty?
end

#each {|topic, partition, topic, has| ... } ⇒ Object

Allows to iterate over all the topics and partitions messages

Yield Parameters:

  • topic (String)

    name

  • partition (Integer)

    number

  • topic (Array<Rdkafka::Consumer::Message>)

    partition aggregated results

  • has (Boolean)

    polling of this partition reach eof



66
67
68
69
70
71
72
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 66

def each
  @groups.each do |topic, partitions|
    partitions.each do |partition, details|
      yield(topic, partition, details[:messages], details[:eof])
    end
  end
end

#eof(topic, partition) ⇒ Object

Marks given topic partition as one that reached eof

Parameters:

  • topic (String)

    topic that reached eof

  • partition (Integer)

    partition that reached eof



51
52
53
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 51

def eof(topic, partition)
  @groups[topic][partition][:eof] = true
end

#polledObject

Marks the last polling time that can be accessed via ‘#last_polled_at`



56
57
58
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 56

def polled
  @last_polled_at = monotonic_now
end

#uniq!Object

Removes duplicated messages from the same partitions This should be used only when rebalance occurs, as we may get data again we already have due to the processing from the last offset. In cases like this, we may get same data again and we do want to ensure as few duplications as possible



95
96
97
98
99
100
101
102
103
# File 'lib/karafka/connection/raw_messages_buffer.rb', line 95

def uniq!
  @groups.each_value do |partitions|
    partitions.each_value do |details|
      details[:messages].uniq!(&:offset)
    end
  end

  recount!
end