Class: MQBench::Kafka
Constant Summary
Constants inherited from Client
Instance Method Summary collapse
-
#initialize(args) ⇒ Kafka
constructor
A new instance of Kafka.
- #recv_msg ⇒ Object
- #send_msg ⇒ Object
Constructor Details
#initialize(args) ⇒ Kafka
Returns a new instance of Kafka.
5 6 7 8 9 10 11 12 |
# File 'lib/mqbench/kafka.rb', line 5 def initialize(args) @port = 9092 @host = 'localhost' super(args) @broker = ::Kafka.new(seed_brokers: ["#{@host}:#{@port}"]) end |
Instance Method Details
#recv_msg ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/mqbench/kafka.rb', line 27 def recv_msg consumer = @broker.consumer(group_id: 'test') # It's possible to subscribe to multiple topics by calling `subscribe` # repeatedly. consumer.subscribe(QNAME) # This will loop indefinitely, yielding each message in turn. current = 1 consumer. do || current += 1 if(current >= @count) break end end end |
#send_msg ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/mqbench/kafka.rb', line 14 def send_msg producer = @broker.producer(:required_acks => 0, :max_buffer_size => (@count * @size), :max_buffer_bytesize => (@count * (@size + 100))) (1..@count).each do |x| producer.produce('a' * @size, topic: QNAME) producer. end producer.shutdown end |