Class: Karafka::Routing::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/routing/topic.rb

Overview

Topic stores all the details on how we should interact with Kafka given topic. It belongs to a consumer group as from 0.6 all the topics can work in the same consumer group It is a part of Karafka’s DSL.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, consumer_group) ⇒ Topic

Returns a new instance of Topic.

Parameters:



26
27
28
29
30
31
32
33
34
35
# File 'lib/karafka/routing/topic.rb', line 26

def initialize(name, consumer_group)
  @name = name.to_s
  @consumer_group = consumer_group
  @attributes = {}
  @active = true
  # @note We use identifier related to the consumer group that owns a topic, because from
  #   Karafka 0.6 we can handle multiple Kafka instances with the same process and we can
  #   have same topic name across multiple consumer groups
  @id = "#{consumer_group.id}_#{@name}"
end

Instance Attribute Details

#consumerClass

Returns consumer class that we should use.

Returns:

  • (Class)

    consumer class that we should use



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/karafka/routing/topic.rb', line 52

def consumer
  if Karafka::App.config.consumer_persistence
    # When persistence of consumers is on, no need to reload them
    @consumer
  else
    # In order to support code reload without having to change the topic api, we re-fetch the
    # class of a consumer based on its class name. This will support all the cases where the
    # consumer class is defined with a name. It won't support code reload for anonymous
    # consumer classes, but this is an edge case
    begin
      ::Object.const_get(@consumer.to_s)
    rescue NameError
      # It will only fail if the in case of anonymous classes
      @consumer
    end
  end
end

#consumer_groupObject (readonly)

Returns the value of attribute consumer_group.



9
10
11
# File 'lib/karafka/routing/topic.rb', line 9

def consumer_group
  @consumer_group
end

#idObject (readonly)

Returns the value of attribute id.



9
10
11
# File 'lib/karafka/routing/topic.rb', line 9

def id
  @id
end

#nameObject (readonly)

Returns the value of attribute name.



9
10
11
# File 'lib/karafka/routing/topic.rb', line 9

def name
  @name
end

#subscription_groupObject

Returns the value of attribute subscription_group.



11
12
13
# File 'lib/karafka/routing/topic.rb', line 11

def subscription_group
  @subscription_group
end

Instance Method Details

#active(active) ⇒ Object

Allows to disable topic by invoking this method and setting it to ‘false`.

Parameters:

  • active (Boolean)

    should this topic be consumed or not



72
73
74
# File 'lib/karafka/routing/topic.rb', line 72

def active(active)
  @active = active
end

#active?Boolean

Returns should this topic be in use.

Returns:

  • (Boolean)

    should this topic be in use



86
87
88
89
90
91
92
93
94
# File 'lib/karafka/routing/topic.rb', line 86

def active?
  # Never active if disabled via routing
  return false unless @active

  topics = Karafka::App.config.internal.routing.active.topics

  # When empty it means no topics were specified, hence all should be used
  topics.empty? || topics.include?(name)
end

#consumer_classClass

Note:

This is just an alias to the ‘#consumer` method. We however want to use it internally instead of referencing the `#consumer`. We use this to indicate that this method returns class and not an instance. In the routing we want to keep the `#consumer Consumer` routing syntax, but for references outside, we should use this one.

Returns consumer class that we should use.

Returns:

  • (Class)

    consumer class that we should use



81
82
83
# File 'lib/karafka/routing/topic.rb', line 81

def consumer_class
  consumer
end

#to_hHash

Note:

This is being used when we validate the consumer_group and its topics

Returns hash with all the topic attributes.

Returns:

  • (Hash)

    hash with all the topic attributes



98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/karafka/routing/topic.rb', line 98

def to_h
  map = INHERITABLE_ATTRIBUTES.map do |attribute|
    [attribute, public_send(attribute)]
  end

  Hash[map].merge!(
    id: id,
    name: name,
    active: active?,
    consumer: consumer,
    consumer_group_id: consumer_group.id,
    subscription_group: subscription_group
  ).freeze
end