Class: Kafka::Producer
Constant Summary collapse
- PRODUCE_REQUEST_ID =
Kafka::RequestType::PRODUCE
Instance Attribute Summary collapse
-
#partition ⇒ Object
Returns the value of attribute partition.
-
#topic ⇒ Object
Returns the value of attribute topic.
Attributes included from IO
Instance Method Summary collapse
- #batch(&block) ⇒ Object
- #encode(message) ⇒ Object
- #encode_request(topic, partition, messages) ⇒ Object
-
#initialize(options = {}) ⇒ Producer
constructor
A new instance of Producer.
- #send(messages) ⇒ Object
Methods included from IO
#connect, #disconnect, #read, #reconnect, #write
Constructor Details
#initialize(options = {}) ⇒ Producer
Returns a new instance of Producer.
24 25 26 27 28 29 30 |
# File 'lib/kafka/producer.rb', line 24 def initialize( = {}) self.topic = [:topic] || "test" self.partition = [:partition] || 0 self.host = [:host] || "localhost" self.port = [:port] || 9092 self.connect(self.host, self.port) end |
Instance Attribute Details
#partition ⇒ Object
Returns the value of attribute partition.
22 23 24 |
# File 'lib/kafka/producer.rb', line 22 def partition @partition end |
#topic ⇒ Object
Returns the value of attribute topic.
22 23 24 |
# File 'lib/kafka/producer.rb', line 22 def topic @topic end |
Instance Method Details
#batch(&block) ⇒ Object
61 62 63 64 65 66 |
# File 'lib/kafka/producer.rb', line 61 def batch(&block) batch = Kafka::Batch.new block.call( batch ) self.send(batch.) batch..clear end |
#encode(message) ⇒ Object
32 33 34 35 36 37 38 39 |
# File 'lib/kafka/producer.rb', line 32 def encode() if RUBY_VERSION[0,3] == "1.8" # Use old iconv on Ruby 1.8 for encoding ic = Iconv.new('UTF-8//IGNORE', 'UTF-8') [.magic].pack("C") + [.calculate_checksum].pack("N") + ic.iconv(.payload.to_s) else [.magic].pack("C") + [.calculate_checksum].pack("N") + .payload.to_s.force_encoding(Encoding::ASCII_8BIT) end end |
#encode_request(topic, partition, messages) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/kafka/producer.rb', line 41 def encode_request(topic, partition, ) = Array().collect { || = self.encode() [.length].pack("N") + }.join("") request = [PRODUCE_REQUEST_ID].pack("n") topic = [topic.length].pack("n") + topic partition = [partition].pack("N") = [.length].pack("N") + data = request + topic + partition + return [data.length].pack("N") + data end |
#send(messages) ⇒ Object
57 58 59 |
# File 'lib/kafka/producer.rb', line 57 def send() self.write(self.encode_request(self.topic, self.partition, )) end |