Module: Karafka::Messages::Builders::Message

Defined in:
lib/karafka/messages/builders/message.rb

Overview

Builder of a single message based on raw rdkafka message.

Class Method Summary collapse

Class Method Details

.call(kafka_message, topic, received_at) ⇒ Karafka::Messages::Message

Returns message object with payload and metadata.

Parameters:

  • kafka_message (Rdkafka::Consumer::Message)

    raw fetched message

  • topic (Karafka::Routing::Topic)

    topic for which this message was fetched

  • received_at (Time)

    moment when we’ve received the message

Returns:



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/karafka/messages/builders/message.rb', line 14

def call(kafka_message, topic, received_at)
  # @see https://github.com/appsignal/rdkafka-ruby/issues/168
  kafka_message.headers.transform_keys!(&:to_s)

   = Karafka::Messages::Metadata.new(
    timestamp: kafka_message.timestamp,
    headers: kafka_message.headers,
    key: kafka_message.key,
    offset: kafka_message.offset,
    deserializer: topic.deserializer,
    partition: kafka_message.partition,
    topic: topic.name,
    received_at: received_at
  ).freeze

  # Karafka messages cannot be frozen because of the lazy deserialization feature
  Karafka::Messages::Message.new(
    kafka_message.payload,
    
  )
end