Class: Karafka::Routing::SubscriptionGroup
- Inherits:
-
Object
- Object
- Karafka::Routing::SubscriptionGroup
- Defined in:
- lib/karafka/routing/subscription_group.rb
Overview
One subscription group will always belong to one consumer group, but one consumer group can have multiple subscription groups.
Object representing a set of single consumer group topics that can be subscribed together with one connection.
Instance Attribute Summary collapse
-
#consumer_group ⇒ Object
readonly
Returns the value of attribute consumer_group.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#kafka ⇒ Object
readonly
Returns the value of attribute kafka.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#topics ⇒ Object
readonly
Returns the value of attribute topics.
Class Method Summary collapse
-
.id ⇒ String
Generates new subscription group id that will be used in case of anonymous subscription groups.
Instance Method Summary collapse
-
#active? ⇒ Boolean
Is this subscription group one of active once.
-
#consumer_group_id ⇒ String
Consumer group id.
-
#initialize(position, topics) ⇒ SubscriptionGroup
constructor
Built subscription group.
-
#max_messages ⇒ Integer
Max messages fetched in a single go.
-
#max_wait_time ⇒ Integer
Max milliseconds we can wait for incoming messages.
Constructor Details
#initialize(position, topics) ⇒ SubscriptionGroup
Returns built subscription group.
34 35 36 37 38 39 40 41 42 |
# File 'lib/karafka/routing/subscription_group.rb', line 34 def initialize(position, topics) @name = topics.first.subscription_group @consumer_group = topics.first.consumer_group @id = "#{@name}_#{position}" @position = position @topics = topics @kafka = build_kafka freeze end |
Instance Attribute Details
#consumer_group ⇒ Object (readonly)
Returns the value of attribute consumer_group.
11 12 13 |
# File 'lib/karafka/routing/subscription_group.rb', line 11 def consumer_group @consumer_group end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
11 12 13 |
# File 'lib/karafka/routing/subscription_group.rb', line 11 def id @id end |
#kafka ⇒ Object (readonly)
Returns the value of attribute kafka.
11 12 13 |
# File 'lib/karafka/routing/subscription_group.rb', line 11 def kafka @kafka end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
11 12 13 |
# File 'lib/karafka/routing/subscription_group.rb', line 11 def name @name end |
#topics ⇒ Object (readonly)
Returns the value of attribute topics.
11 12 13 |
# File 'lib/karafka/routing/subscription_group.rb', line 11 def topics @topics end |
Class Method Details
.id ⇒ String
Generates new subscription group id that will be used in case of anonymous subscription
groups
22 23 24 25 26 |
# File 'lib/karafka/routing/subscription_group.rb', line 22 def id ::Digest::MD5.hexdigest( GROUP_COUNT.increment.to_s )[0..11] end |
Instance Method Details
#active? ⇒ Boolean
Returns is this subscription group one of active once.
60 61 62 63 64 65 |
# File 'lib/karafka/routing/subscription_group.rb', line 60 def active? sgs = Karafka::App.config.internal.routing.active.subscription_groups # When empty it means no groups were specified, hence all should be used sgs.empty? || sgs.include?(name) end |
#consumer_group_id ⇒ String
Returns consumer group id.
45 46 47 |
# File 'lib/karafka/routing/subscription_group.rb', line 45 def consumer_group_id kafka[:'group.id'] end |
#max_messages ⇒ Integer
Returns max messages fetched in a single go.
50 51 52 |
# File 'lib/karafka/routing/subscription_group.rb', line 50 def @topics.first. end |
#max_wait_time ⇒ Integer
Returns max milliseconds we can wait for incoming messages.
55 56 57 |
# File 'lib/karafka/routing/subscription_group.rb', line 55 def max_wait_time @topics.first.max_wait_time end |