Module: FlippRubyKafka::TestHelpers
- Defined in:
- lib/flipp_ruby_kafka/test_helpers.rb
Overview
:nodoc:
Instance Method Summary collapse
-
#kafkateria_delete_topic(topic) ⇒ Object
Use Kafkateria to clear a topic.
-
#kafkateria_get_messages(topic:, num_messages: 10) ⇒ Array<Hash>
With :key and :payload keys.
-
#kafkateria_produce_messages(topic:, num_messages: 1, traits: [], trait_params: {}, values: {}, null_payload: false, key: nil) ⇒ Object
Use Kafkateria to generate and produce messages.
Instance Method Details
#kafkateria_delete_topic(topic) ⇒ Object
Use Kafkateria to clear a topic.
11 12 13 14 15 16 17 |
# File 'lib/flipp_ruby_kafka/test_helpers.rb', line 11 def kafkateria_delete_topic(topic) base = FlippRubyKafka.config.kafkateria_url uri = URI("#{base}/v1/topics/delete_topic?topic=#{topic}") request = Net::HTTP::Post.new(uri) http = Net::HTTP.new(uri.host, uri.port) http.request(request) end |
#kafkateria_get_messages(topic:, num_messages: 10) ⇒ Array<Hash>
Returns with :key and :payload keys.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/flipp_ruby_kafka/test_helpers.rb', line 43 def (topic:, num_messages: 10) base = FlippRubyKafka.config.kafkateria_url uri = URI("#{base}//v1/messages?topic=#{topic}&num_messages=#{}") res = Net::HTTP.get_response(uri) if res.code.to_s != '200' raise "Failed #{res.code} getting messages from #{topic}: #{res.body}" end response = JSON.parse(res.body) if response['payloads'].present? response['payloads'].map(&:with_indifferent_access) else response end end |
#kafkateria_produce_messages(topic:, num_messages: 1, traits: [], trait_params: {}, values: {}, null_payload: false, key: nil) ⇒ Object
Use Kafkateria to generate and produce messages.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/flipp_ruby_kafka/test_helpers.rb', line 24 def (topic:, num_messages: 1, traits: [], trait_params: {}, values: {}, null_payload: false, key: nil) base = FlippRubyKafka.config.kafkateria_url uri = URI("#{base}//v1/messages/generate_and_publish") req = Net::HTTP::Post.new(uri, 'Content-Type' => 'application/json') req.body = { topic: topic, values: values, traits: traits, trait_params: trait_params, num_messages: , null_payload: null_payload, key: key }.to_json response = Net::HTTP.start(uri.hostname, uri.port) do |http| http.request(req) end if response.code.to_s != '200' raise "Failed #{response.code} producing messages to #{topic}: #{response.body}" end JSON.parse(response.body || []) end |