Class: Karafka::Routing::Topic
- Inherits:
-
Object
- Object
- Karafka::Routing::Topic
- Defined in:
- lib/karafka/routing/topic.rb
Overview
Topic stores all the details on how we should interact with Kafka given topic. It belongs to a consumer group as from 0.6 all the topics can work in the same consumer group It is a part of Karafka’s DSL.
Instance Attribute Summary collapse
-
#consumer ⇒ Class
Consumer class that we should use.
-
#consumer_group ⇒ Object
readonly
Returns the value of attribute consumer_group.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#subscription_group ⇒ Object
Returns the value of attribute subscription_group.
Instance Method Summary collapse
-
#active(active) ⇒ Object
Allows to disable topic by invoking this method and setting it to ‘false`.
-
#active? ⇒ Boolean
Should this topic be in use.
-
#consumer_class ⇒ Class
Consumer class that we should use.
-
#initialize(name, consumer_group) ⇒ Topic
constructor
A new instance of Topic.
-
#to_h ⇒ Hash
Hash with all the topic attributes.
Constructor Details
#initialize(name, consumer_group) ⇒ Topic
Returns a new instance of Topic.
26 27 28 29 30 31 32 33 34 35 |
# File 'lib/karafka/routing/topic.rb', line 26 def initialize(name, consumer_group) @name = name.to_s @consumer_group = consumer_group @attributes = {} @active = true # @note We use identifier related to the consumer group that owns a topic, because from # Karafka 0.6 we can handle multiple Kafka instances with the same process and we can # have same topic name across multiple consumer groups @id = "#{consumer_group.id}_#{@name}" end |
Instance Attribute Details
#consumer ⇒ Class
Returns consumer class that we should use.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/karafka/routing/topic.rb', line 52 def consumer if Karafka::App.config.consumer_persistence # When persistence of consumers is on, no need to reload them @consumer else # In order to support code reload without having to change the topic api, we re-fetch the # class of a consumer based on its class name. This will support all the cases where the # consumer class is defined with a name. It won't support code reload for anonymous # consumer classes, but this is an edge case begin ::Object.const_get(@consumer.to_s) rescue NameError # It will only fail if the in case of anonymous classes @consumer end end end |
#consumer_group ⇒ Object (readonly)
Returns the value of attribute consumer_group.
9 10 11 |
# File 'lib/karafka/routing/topic.rb', line 9 def consumer_group @consumer_group end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
9 10 11 |
# File 'lib/karafka/routing/topic.rb', line 9 def id @id end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
9 10 11 |
# File 'lib/karafka/routing/topic.rb', line 9 def name @name end |
#subscription_group ⇒ Object
Returns the value of attribute subscription_group.
11 12 13 |
# File 'lib/karafka/routing/topic.rb', line 11 def subscription_group @subscription_group end |
Instance Method Details
#active(active) ⇒ Object
Allows to disable topic by invoking this method and setting it to ‘false`.
72 73 74 |
# File 'lib/karafka/routing/topic.rb', line 72 def active(active) @active = active end |
#active? ⇒ Boolean
Returns should this topic be in use.
86 87 88 89 90 91 92 93 94 |
# File 'lib/karafka/routing/topic.rb', line 86 def active? # Never active if disabled via routing return false unless @active topics = Karafka::App.config.internal.routing.active.topics # When empty it means no topics were specified, hence all should be used topics.empty? || topics.include?(name) end |
#consumer_class ⇒ Class
This is just an alias to the ‘#consumer` method. We however want to use it internally instead of referencing the `#consumer`. We use this to indicate that this method returns class and not an instance. In the routing we want to keep the `#consumer Consumer` routing syntax, but for references outside, we should use this one.
Returns consumer class that we should use.
81 82 83 |
# File 'lib/karafka/routing/topic.rb', line 81 def consumer_class consumer end |
#to_h ⇒ Hash
This is being used when we validate the consumer_group and its topics
Returns hash with all the topic attributes.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/karafka/routing/topic.rb', line 98 def to_h map = INHERITABLE_ATTRIBUTES.map do |attribute| [attribute, public_send(attribute)] end Hash[map].merge!( id: id, name: name, active: active?, consumer: consumer, consumer_group_id: consumer_group.id, subscription_group: subscription_group ).freeze end |