Module: Karafka::Messages::Builders::Message

Defined in:
lib/karafka/messages/builders/message.rb

Overview

Builder of a single message based on raw rdkafka message.

Class Method Summary collapse

Class Method Details

.call(kafka_message, topic, received_at) ⇒ Karafka::Messages::Message

Returns message object with payload and metadata.

Parameters:

  • kafka_message (Rdkafka::Consumer::Message)

    raw fetched message

  • topic (Karafka::Routing::Topic)

    topic for which this message was fetched

  • received_at (Time)

    moment when we’ve received the message

Returns:



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/karafka/messages/builders/message.rb', line 14

def call(kafka_message, topic, received_at)
   = Karafka::Messages::Metadata.new(
    timestamp: kafka_message.timestamp,
    headers: kafka_message.headers,
    key: kafka_message.key,
    offset: kafka_message.offset,
    deserializer: topic.deserializer,
    partition: kafka_message.partition,
    topic: topic.name,
    received_at: received_at
  ).freeze

  # Get the raw payload
  payload = kafka_message.payload

  # And nullify it in the kafka message. This can save a lot of memory when used with
  # the Pro Cleaner API
  kafka_message.instance_variable_set('@payload', nil)

  # Karafka messages cannot be frozen because of the lazy deserialization feature
  Karafka::Messages::Message.new(payload, )
end