Class: Karafka::Routing::SubscriptionGroup
- Inherits:
-
Object
- Object
- Karafka::Routing::SubscriptionGroup
- Defined in:
- lib/karafka/routing/subscription_group.rb
Overview
One subscription group will always belong to one consumer group, but one consumer group can have multiple subscription groups.
Object representing a set of single consumer group topics that can be subscribed together with one connection.
Instance Attribute Summary collapse
-
#consumer_group ⇒ Object
readonly
Returns the value of attribute consumer_group.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#kafka ⇒ Object
readonly
Returns the value of attribute kafka.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#topics ⇒ Object
readonly
Returns the value of attribute topics.
Class Method Summary collapse
-
.id ⇒ String
Generates new subscription group id that will be used in case of anonymous subscription groups.
Instance Method Summary collapse
-
#active? ⇒ Boolean
Is this subscription group one of active once.
-
#consumer_group_id ⇒ String
Consumer group id.
-
#initialize(position, topics) ⇒ SubscriptionGroup
constructor
Built subscription group.
-
#max_messages ⇒ Integer
Max messages fetched in a single go.
-
#max_wait_time ⇒ Integer
Max milliseconds we can wait for incoming messages.
-
#subscriptions ⇒ Array<String>
Names of topics to which we should subscribe.
-
#to_s ⇒ String
Id of the subscription group.
Constructor Details
#initialize(position, topics) ⇒ SubscriptionGroup
Returns built subscription group.
34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/karafka/routing/subscription_group.rb', line 34 def initialize(position, topics) @name = topics.first.subscription_group_name @consumer_group = topics.first.consumer_group # We include the consumer group id here because we want to have unique ids of subscription # groups across the system. Otherwise user could set the same name for multiple # subscription groups in many consumer groups effectively having same id for different # entities @id = "#{@consumer_group.id}_#{@name}_#{position}" @position = position @topics = topics @kafka = build_kafka freeze end |
Instance Attribute Details
#consumer_group ⇒ Object (readonly)
Returns the value of attribute consumer_group.
11 12 13 |
# File 'lib/karafka/routing/subscription_group.rb', line 11 def consumer_group @consumer_group end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
11 12 13 |
# File 'lib/karafka/routing/subscription_group.rb', line 11 def id @id end |
#kafka ⇒ Object (readonly)
Returns the value of attribute kafka.
11 12 13 |
# File 'lib/karafka/routing/subscription_group.rb', line 11 def kafka @kafka end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
11 12 13 |
# File 'lib/karafka/routing/subscription_group.rb', line 11 def name @name end |
#topics ⇒ Object (readonly)
Returns the value of attribute topics.
11 12 13 |
# File 'lib/karafka/routing/subscription_group.rb', line 11 def topics @topics end |
Class Method Details
.id ⇒ String
Generates new subscription group id that will be used in case of anonymous subscription
groups
22 23 24 25 26 |
# File 'lib/karafka/routing/subscription_group.rb', line 22 def id ::Digest::MD5.hexdigest( GROUP_COUNT.increment.to_s )[0..11] end |
Instance Method Details
#active? ⇒ Boolean
Returns is this subscription group one of active once.
64 65 66 |
# File 'lib/karafka/routing/subscription_group.rb', line 64 def active? Karafka::App.config.internal.routing.activity_manager.active?(:subscription_groups, name) end |
#consumer_group_id ⇒ String
Returns consumer group id.
49 50 51 |
# File 'lib/karafka/routing/subscription_group.rb', line 49 def consumer_group_id kafka[:'group.id'] end |
#max_messages ⇒ Integer
Returns max messages fetched in a single go.
54 55 56 |
# File 'lib/karafka/routing/subscription_group.rb', line 54 def @topics.first. end |
#max_wait_time ⇒ Integer
Returns max milliseconds we can wait for incoming messages.
59 60 61 |
# File 'lib/karafka/routing/subscription_group.rb', line 59 def max_wait_time @topics.first.max_wait_time end |
#subscriptions ⇒ Array<String>
Most of the time it should not include inactive topics but in case of pattern matching the matcher topics become inactive down the road, hence we filter out so they are later removed.
Returns names of topics to which we should subscribe.
73 74 75 |
# File 'lib/karafka/routing/subscription_group.rb', line 73 def subscriptions topics.select(&:active?).map(&:subscription_name) end |
#to_s ⇒ String
This is an alias for displaying in places where we print the stringified version.
Returns id of the subscription group.
79 80 81 |
# File 'lib/karafka/routing/subscription_group.rb', line 79 def to_s id end |