Class: Kafka::Producer

Inherits:
Object
  • Object
show all
Includes:
IO
Defined in:
lib/kafka/producer.rb

Constant Summary collapse

PRODUCE_REQUEST_ID =
Kafka::RequestType::PRODUCE

Instance Attribute Summary collapse

Attributes included from IO

#host, #port, #socket

Instance Method Summary collapse

Methods included from IO

#connect, #disconnect, #read, #reconnect, #write

Constructor Details

#initialize(options = {}) ⇒ Producer

Returns a new instance of Producer.



10
11
12
13
14
15
16
# File 'lib/kafka/producer.rb', line 10

def initialize(options = {})
  self.topic     = options[:topic]      || "test"
  self.partition = options[:partition]  || 0
  self.host      = options[:host]       || "localhost"
  self.port      = options[:port]       || 9092
  self.connect(self.host, self.port)
end

Instance Attribute Details

#partitionObject

Returns the value of attribute partition.



8
9
10
# File 'lib/kafka/producer.rb', line 8

def partition
  @partition
end

#topicObject

Returns the value of attribute topic.



8
9
10
# File 'lib/kafka/producer.rb', line 8

def topic
  @topic
end

Instance Method Details

#batch(&block) ⇒ Object



42
43
44
45
46
47
# File 'lib/kafka/producer.rb', line 42

def batch(&block)
  batch = Kafka::Batch.new
  block.call( batch )
  self.send(batch.messages)
  batch.messages.clear
end

#encode(message) ⇒ Object



18
19
20
# File 'lib/kafka/producer.rb', line 18

def encode(message)
  [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload.to_s
end

#encode_request(topic, partition, messages) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/kafka/producer.rb', line 22

def encode_request(topic, partition, messages)
  message_set = Array(messages).collect { |message|
    encoded_message = self.encode(message)
    [encoded_message.length].pack("N") + encoded_message
  }.join("")

  request   = [PRODUCE_REQUEST_ID].pack("n")
  topic     = [topic.length].pack("n") + topic
  partition = [partition].pack("N")
  messages  = [message_set.length].pack("N") + message_set

  data = request + topic + partition + messages

  return [data.length].pack("N") + data
end

#send(messages) ⇒ Object



38
39
40
# File 'lib/kafka/producer.rb', line 38

def send(messages)
  self.write(self.encode_request(self.topic, self.partition, messages))
end