Module: FlippRubyKafka::TestHelpers

Defined in:
lib/flipp_ruby_kafka/test_helpers.rb

Overview

:nodoc:

Instance Method Summary collapse

Instance Method Details

#kafkateria_delete_topic(topic) ⇒ Object

Use Kafkateria to clear a topic.

Parameters:

  • topic (String)


11
12
13
14
15
16
17
# File 'lib/flipp_ruby_kafka/test_helpers.rb', line 11

def kafkateria_delete_topic(topic)
  base = FlippRubyKafka.config.kafkateria_url
  uri = URI("#{base}/v1/topics/delete_topic?topic=#{topic}")
  request = Net::HTTP::Post.new(uri)
  http = Net::HTTP.new(uri.host, uri.port)
  http.request(request)
end

#kafkateria_get_messages(topic:, num_messages: 10) ⇒ Array<Hash>

Returns with :key and :payload keys.

Parameters:

  • topic (String)
  • num_messages (Integer) (defaults to: 10)

Returns:

  • (Array<Hash>)

    with :key and :payload keys



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/flipp_ruby_kafka/test_helpers.rb', line 43

def kafkateria_get_messages(topic:, num_messages: 10)
  base = FlippRubyKafka.config.kafkateria_url
  uri = URI("#{base}//v1/messages?topic=#{topic}&num_messages=#{num_messages}")
  res = Net::HTTP.get_response(uri)
  if res.code.to_s != '200'
    raise "Failed #{res.code} getting messages from #{topic}: #{res.body}"
  end

  response = JSON.parse(res.body)
  if response['payloads'].present?
    response['payloads'].map(&:with_indifferent_access)
  else
    response
  end
end

#kafkateria_produce_messages(topic:, num_messages: 1, traits: [], trait_params: {}, values: {}, null_payload: false, key: nil) ⇒ Object

Use Kafkateria to generate and produce messages.

Parameters:

  • topic (String)
  • num_messages (Integer) (defaults to: 1)
  • traits (Array<Symbol>) (defaults to: [])
  • values (Hash) (defaults to: {})


24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/flipp_ruby_kafka/test_helpers.rb', line 24

def kafkateria_produce_messages(topic:, num_messages: 1, traits: [], trait_params: {}, values: {}, null_payload: false, key: nil)
  base = FlippRubyKafka.config.kafkateria_url
  uri = URI("#{base}//v1/messages/generate_and_publish")
  req = Net::HTTP::Post.new(uri, 'Content-Type' => 'application/json')
  req.body = { topic: topic, values: values,
               traits: traits, trait_params: trait_params, num_messages: num_messages, null_payload: null_payload, key: key }.to_json
  response = Net::HTTP.start(uri.hostname, uri.port) do |http|
    http.request(req)
  end
  if response.code.to_s != '200'
    raise "Failed #{response.code} producing messages to #{topic}: #{response.body}"
  end

  JSON.parse(response.body || [])
end