Module: Kafka::Encoder
- Defined in:
- lib/kafka/encoder.rb
Class Method Summary collapse
- .message(message, compression = Message::NO_COMPRESSION) ⇒ Object
- .message_block(topic, partition, messages, compression) ⇒ Object
- .message_set(messages, compression) ⇒ Object
- .multiproduce(producer_requests, compression = Message::NO_COMPRESSION) ⇒ Object
- .produce(topic, partition, messages, compression = Message::NO_COMPRESSION) ⇒ Object
Class Method Details
.message(message, compression = Message::NO_COMPRESSION) ⇒ Object
18 19 20 |
# File 'lib/kafka/encoder.rb', line 18 def self.(, compression = Message::NO_COMPRESSION) .encode(compression) end |
.message_block(topic, partition, messages, compression) ⇒ Object
22 23 24 25 26 27 28 29 30 |
# File 'lib/kafka/encoder.rb', line 22 def self.(topic, partition, , compression) = (, compression) topic = [topic.length].pack("n") + topic partition = [partition].pack("N") = [.length].pack("N") + return topic + partition + end |
.message_set(messages, compression) ⇒ Object
32 33 34 35 36 37 38 |
# File 'lib/kafka/encoder.rb', line 32 def self.(, compression) = Array().collect { || self.() }.join("") = self.(Message.new(), compression) unless compression == Message::NO_COMPRESSION end |
.multiproduce(producer_requests, compression = Message::NO_COMPRESSION) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/kafka/encoder.rb', line 47 def self.multiproduce(producer_requests, compression = Message::NO_COMPRESSION) part_set = Array(producer_requests).map { |req| self.(req.topic, req.partition, req., compression) } request = [RequestType::MULTIPRODUCE].pack("n") parts = [part_set.length].pack("n") + part_set.join("") data = request + parts return [data.length].pack("N") + data end |
.produce(topic, partition, messages, compression = Message::NO_COMPRESSION) ⇒ Object
40 41 42 43 44 45 |
# File 'lib/kafka/encoder.rb', line 40 def self.produce(topic, partition, , compression = Message::NO_COMPRESSION) request = [RequestType::PRODUCE].pack("n") data = request + self.(topic, partition, , compression) return [data.length].pack("N") + data end |