Class: MQBench::Kafka

Inherits:
Client
  • Object
show all
Defined in:
lib/mqbench/kafka.rb

Constant Summary

Constants inherited from Client

Client::QNAME

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ Kafka

Returns a new instance of Kafka.



5
6
7
8
9
10
11
12
# File 'lib/mqbench/kafka.rb', line 5

def initialize(args)
  @port = 9092
  @host = 'localhost'
  
  super(args)
  
  @broker = ::Kafka.new(seed_brokers: ["#{@host}:#{@port}"])
end

Instance Method Details

#recv_msgObject



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/mqbench/kafka.rb', line 27

def recv_msg
  consumer = @broker.consumer(group_id: 'test')
  
  # It's possible to subscribe to multiple topics by calling `subscribe`
  # repeatedly.
  consumer.subscribe(QNAME)
  
  # This will loop indefinitely, yielding each message in turn.
  current = 1
  consumer.each_message do |message|
    current += 1
    if(current >= @count)
      break
    end
  end
end

#send_msgObject



14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/mqbench/kafka.rb', line 14

def send_msg
  producer = @broker.producer(:required_acks => 0,
                              :max_buffer_size => (@count * @size),
                              :max_buffer_bytesize => (@count * (@size + 100)))

  (1..@count).each do |x|
    producer.produce('a' * @size, topic: QNAME)
    producer.deliver_messages
  end

  producer.shutdown
end