Module: Kafka::Encoder

Defined in:
lib/kafka/encoder.rb

Class Method Summary collapse

Class Method Details

.message(message, compression = Message::NO_COMPRESSION) ⇒ Object



18
19
20
# File 'lib/kafka/encoder.rb', line 18

def self.message(message, compression = Message::NO_COMPRESSION)
  message.encode(compression)
end

.message_block(topic, partition, messages, compression) ⇒ Object



22
23
24
25
26
27
28
29
30
# File 'lib/kafka/encoder.rb', line 22

def self.message_block(topic, partition, messages, compression)
  message_set = message_set(messages, compression)

  topic     = [topic.length].pack("n") + topic
  partition = [partition].pack("N")
  messages  = [message_set.length].pack("N") + message_set

  return topic + partition + messages
end

.message_set(messages, compression) ⇒ Object



32
33
34
35
36
37
38
# File 'lib/kafka/encoder.rb', line 32

def self.message_set(messages, compression)
  message_set = Array(messages).collect { |message|
    self.message(message)
  }.join("")
  message_set = self.message(Message.new(message_set), compression) unless compression == Message::NO_COMPRESSION
  message_set
end

.multiproduce(producer_requests, compression = Message::NO_COMPRESSION) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
# File 'lib/kafka/encoder.rb', line 47

def self.multiproduce(producer_requests, compression = Message::NO_COMPRESSION)
  part_set = Array(producer_requests).map { |req|
    self.message_block(req.topic, req.partition, req.messages, compression)
  }

  request = [RequestType::MULTIPRODUCE].pack("n")
  parts = [part_set.length].pack("n") + part_set.join("")
  data = request + parts

  return [data.length].pack("N") + data
end

.produce(topic, partition, messages, compression = Message::NO_COMPRESSION) ⇒ Object



40
41
42
43
44
45
# File 'lib/kafka/encoder.rb', line 40

def self.produce(topic, partition, messages, compression = Message::NO_COMPRESSION)
  request = [RequestType::PRODUCE].pack("n")
  data = request + self.message_block(topic, partition, messages, compression)

  return [data.length].pack("N") + data
end