Class: Karafka::Connection::MessagesBuffer
- Inherits:
-
Object
- Object
- Karafka::Connection::MessagesBuffer
- Defined in:
- lib/karafka/connection/messages_buffer.rb
Overview
This buffer is NOT thread safe. We do not worry about it as we do not use it outside of the main listener loop. It can be cleared after the jobs are scheduled with messages it stores, because messages arrays are not “cleared” in any way directly and their reference stays.
Buffer used to build and store karafka messages built based on raw librdkafka messages.
Why do we have two buffers? ‘RawMessagesBuffer` is used to store raw messages and to handle
cases related to partition revocation and reconnections. It is "internal" to the listening
process. `MessagesBuffer` on the other hand is used to "translate" those raw messages that
we know that are ok into Karafka messages and to simplify further work with them.
While it adds a bit of overhead, it makes conceptual things much easier and it adds only two
simple hash iterations over messages batch.
Instance Attribute Summary collapse
-
#size ⇒ Object
readonly
Returns the value of attribute size.
Instance Method Summary collapse
-
#each {|topic, partition, messages, true, last| ... } ⇒ Object
Allows to iterate over all the topics and partitions messages.
-
#empty? ⇒ Boolean
Is the buffer empty or does it contain any messages.
-
#initialize(subscription_group) ⇒ MessagesBuffer
constructor
A new instance of MessagesBuffer.
-
#present?(topic, partition) ⇒ Boolean
Checks if there are any messages from a given topic partition in the buffer.
-
#remap(raw_messages_buffer) ⇒ Object
Remaps raw messages from the raw messages buffer to Karafka messages.
Constructor Details
#initialize(subscription_group) ⇒ MessagesBuffer
Returns a new instance of MessagesBuffer.
23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/karafka/connection/messages_buffer.rb', line 23 def initialize(subscription_group) @subscription_group = subscription_group @size = 0 @groups = Hash.new do |topic_groups, topic| topic_groups[topic] = Hash.new do |partition_groups, partition| partition_groups[partition] = { eof: false, messages: [] } end end end |
Instance Attribute Details
#size ⇒ Object (readonly)
Returns the value of attribute size.
20 21 22 |
# File 'lib/karafka/connection/messages_buffer.rb', line 20 def size @size end |
Instance Method Details
#each {|topic, partition, messages, true, last| ... } ⇒ Object
Allows to iterate over all the topics and partitions messages
75 76 77 78 79 80 81 |
# File 'lib/karafka/connection/messages_buffer.rb', line 75 def each @groups.each do |topic, partitions| partitions.each do |partition, details| yield(topic, partition, details[:messages], details[:eof], details[:last_polled_at]) end end end |
#empty? ⇒ Boolean
Returns is the buffer empty or does it contain any messages.
96 97 98 |
# File 'lib/karafka/connection/messages_buffer.rb', line 96 def empty? @size.zero? end |
#present?(topic, partition) ⇒ Boolean
Checks if there are any messages from a given topic partition in the buffer
88 89 90 91 92 93 |
# File 'lib/karafka/connection/messages_buffer.rb', line 88 def present?(topic, partition) return false unless @groups.include?(topic) return false unless @groups[topic].include?(partition) true end |
#remap(raw_messages_buffer) ⇒ Object
Remaps raw messages from the raw messages buffer to Karafka messages
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/karafka/connection/messages_buffer.rb', line 39 def remap() clear # Since it happens "right after" we've received the messages, it is close enough it time # to be used as the moment we received messages. received_at = Time.now last_polled_at = .last_polled_at .each do |topic, partition, , eof| @size += .count ktopic = @subscription_group.topics.find(topic) = .map do || Messages::Builders::Message.call( , ktopic, received_at ) end @groups[topic][partition] = { eof: eof, messages: , last_polled_at: last_polled_at } end end |