Class: Kafka::Producer
Constant Summary collapse
- PRODUCE_REQUEST_ID =
Kafka::RequestType::PRODUCE
Instance Attribute Summary collapse
-
#partition ⇒ Object
Returns the value of attribute partition.
-
#topic ⇒ Object
Returns the value of attribute topic.
Attributes included from IO
Instance Method Summary collapse
- #batch(&block) ⇒ Object
- #encode(message) ⇒ Object
- #encode_request(topic, partition, messages) ⇒ Object
-
#initialize(options = {}) ⇒ Producer
constructor
A new instance of Producer.
- #send(messages) ⇒ Object
Methods included from IO
#connect, #disconnect, #read, #reconnect, #write
Constructor Details
#initialize(options = {}) ⇒ Producer
Returns a new instance of Producer.
10 11 12 13 14 15 16 |
# File 'lib/kafka/producer.rb', line 10 def initialize( = {}) self.topic = [:topic] || "test" self.partition = [:partition] || 0 self.host = [:host] || "localhost" self.port = [:port] || 9092 self.connect(self.host, self.port) end |
Instance Attribute Details
#partition ⇒ Object
Returns the value of attribute partition.
8 9 10 |
# File 'lib/kafka/producer.rb', line 8 def partition @partition end |
#topic ⇒ Object
Returns the value of attribute topic.
8 9 10 |
# File 'lib/kafka/producer.rb', line 8 def topic @topic end |
Instance Method Details
#batch(&block) ⇒ Object
42 43 44 45 46 47 |
# File 'lib/kafka/producer.rb', line 42 def batch(&block) batch = Kafka::Batch.new block.call( batch ) self.send(batch.) batch..clear end |
#encode(message) ⇒ Object
18 19 20 |
# File 'lib/kafka/producer.rb', line 18 def encode() [.magic].pack("C") + [.calculate_checksum].pack("N") + .payload.to_s end |
#encode_request(topic, partition, messages) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/kafka/producer.rb', line 22 def encode_request(topic, partition, ) = Array().collect { || = self.encode() [.length].pack("N") + }.join("") request = [PRODUCE_REQUEST_ID].pack("n") topic = [topic.length].pack("n") + topic partition = [partition].pack("N") = [.length].pack("N") + data = request + topic + partition + return [data.length].pack("N") + data end |
#send(messages) ⇒ Object
38 39 40 |
# File 'lib/kafka/producer.rb', line 38 def send() self.write(self.encode_request(self.topic, self.partition, )) end |