Class: Karafka::Routing::SubscriptionGroup

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/routing/subscription_group.rb

Overview

Note:

One subscription group will always belong to one consumer group, but one consumer group can have multiple subscription groups.

Object representing a set of single consumer group topics that can be subscribed together with one connection.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(position, topics) ⇒ SubscriptionGroup

Returns built subscription group.

Parameters:

  • position (Integer)

    position of this subscription group in all the subscriptions groups array. We need to have this value for sake of static group memberships, where we need a “in-between” restarts unique identifier

  • topics (Karafka::Routing::Topics)

    all the topics that share the same key settings



34
35
36
37
38
39
40
41
42
# File 'lib/karafka/routing/subscription_group.rb', line 34

def initialize(position, topics)
  @name = topics.first.subscription_group
  @consumer_group = topics.first.consumer_group
  @id = "#{@name}_#{position}"
  @position = position
  @topics = topics
  @kafka = build_kafka
  freeze
end

Instance Attribute Details

#consumer_groupObject (readonly)

Returns the value of attribute consumer_group.



11
12
13
# File 'lib/karafka/routing/subscription_group.rb', line 11

def consumer_group
  @consumer_group
end

#idObject (readonly)

Returns the value of attribute id.



11
12
13
# File 'lib/karafka/routing/subscription_group.rb', line 11

def id
  @id
end

#kafkaObject (readonly)

Returns the value of attribute kafka.



11
12
13
# File 'lib/karafka/routing/subscription_group.rb', line 11

def kafka
  @kafka
end

#nameObject (readonly)

Returns the value of attribute name.



11
12
13
# File 'lib/karafka/routing/subscription_group.rb', line 11

def name
  @name
end

#topicsObject (readonly)

Returns the value of attribute topics.



11
12
13
# File 'lib/karafka/routing/subscription_group.rb', line 11

def topics
  @topics
end

Class Method Details

.idString

Generates new subscription group id that will be used in case of anonymous subscription

groups

Returns:

  • (String)

    hex(6) compatible reproducible id



22
23
24
25
26
# File 'lib/karafka/routing/subscription_group.rb', line 22

def id
  ::Digest::MD5.hexdigest(
    GROUP_COUNT.increment.to_s
  )[0..11]
end

Instance Method Details

#active?Boolean

Returns is this subscription group one of active once.

Returns:

  • (Boolean)

    is this subscription group one of active once



60
61
62
63
64
65
# File 'lib/karafka/routing/subscription_group.rb', line 60

def active?
  sgs = Karafka::App.config.internal.routing.active.subscription_groups

  # When empty it means no groups were specified, hence all should be used
  sgs.empty? || sgs.include?(name)
end

#consumer_group_idString

Returns consumer group id.

Returns:

  • (String)

    consumer group id



45
46
47
# File 'lib/karafka/routing/subscription_group.rb', line 45

def consumer_group_id
  kafka[:'group.id']
end

#max_messagesInteger

Returns max messages fetched in a single go.

Returns:

  • (Integer)

    max messages fetched in a single go



50
51
52
# File 'lib/karafka/routing/subscription_group.rb', line 50

def max_messages
  @topics.first.max_messages
end

#max_wait_timeInteger

Returns max milliseconds we can wait for incoming messages.

Returns:

  • (Integer)

    max milliseconds we can wait for incoming messages



55
56
57
# File 'lib/karafka/routing/subscription_group.rb', line 55

def max_wait_time
  @topics.first.max_wait_time
end