Module: Karafka::Routing::Router

Defined in:
lib/karafka/routing/router.rb

Overview

Note:

Since Kafka does not provide namespaces or modules for topics, they all have “flat” structure so all the routes are being stored in a single level array

Karafka framework Router for routing incoming messages to proper consumers

Class Method Summary collapse

Class Method Details

.find_by(lookup) ⇒ Karafka::Routing::Topic?

Finds first reference of a given topic based on provided lookup attribute

Parameters:

  • lookup (Hash<Symbol, String>)

    hash with attribute - value key pairs

Returns:



13
14
15
16
17
18
19
20
21
22
23
# File 'lib/karafka/routing/router.rb', line 13

def find_by(lookup)
  App.consumer_groups.each do |consumer_group|
    consumer_group.topics.each do |topic|
      return topic if lookup.all? do |attribute, value|
        topic.public_send(attribute) == value
      end
    end
  end

  nil
end

.find_or_initialize_by_name(name) ⇒ Karafka::Routing::Topic

Note:

Please note, that in case of a new topic, it will have a newly built consumer group as well, that is not part of the routing.

Finds the topic by name (in any consumer group) and if not present, will built a new representation of the topic with the defaults and default deserializers.

This is used in places where we may operate on topics that are not part of the routing but we want to do something on them (display data, iterate over, etc)

Parameters:

  • name (String)

    name of the topic we are looking for

Returns:



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/karafka/routing/router.rb', line 35

def find_or_initialize_by_name(name)
  existing_topic = find_by(name: name)

  return existing_topic if existing_topic

  virtual_topic = Topic.new(name, ConsumerGroup.new(name))

  Karafka::Routing::Proxy.new(
    virtual_topic,
    Karafka::App.config.internal.routing.builder.defaults
  ).target
end