Class: Karafka::App

Inherits:
Object
  • Object
show all
Extended by:
Setup::Dsl
Defined in:
lib/karafka/app.rb

Overview

App class

Class Method Summary collapse

Methods included from Setup::Dsl

config, setup

Class Method Details

.assignmentsHash<Karafka::Routing::Topic, Array<Integer>>

Returns current assignments of this process. Both topics and partitions

Returns:



56
57
58
# File 'lib/karafka/app.rb', line 56

def assignments
  Instrumentation::AssignmentsTracker.instance.current
end

.consumer_groupsKarafka::Routing::Builder Also known as: routes

Returns consumers builder instance alias.

Returns:



24
25
26
27
28
29
# File 'lib/karafka/app.rb', line 24

def consumer_groups
  config
    .internal
    .routing
    .builder
end

.debug!(contexts = 'all') ⇒ Object

Forces the debug setup onto Karafka and default WaterDrop producer. This needs to run prior to any operations that would cache state, like consuming or producing messages.

Parameters:

  • contexts (String) (defaults to: 'all')

    librdkafka low level debug contexts for granular debugging



104
105
106
107
108
109
110
111
112
113
114
# File 'lib/karafka/app.rb', line 104

def debug!(contexts = 'all')
  logger.level = ::Logger::DEBUG
  producer.config.logger.level = ::Logger::DEBUG

  config.kafka[:debug] = contexts
  producer.config.kafka[:debug] = contexts

  consumer_groups.map(&:topics).flat_map(&:to_a).each do |topic|
    topic.kafka[:debug] = contexts
  end
end

.done?Boolean

Note:

It is a meta status from the status object

Returns true if we should be done in general with processing anything.

Returns:

  • (Boolean)

    true if we should be done in general with processing anything



79
80
81
# File 'lib/karafka/app.rb', line 79

def done?
  App.config.internal.status.done?
end

.subscription_groupsHash

Returns active subscription groups grouped based on consumer group in a hash.

Returns:

  • (Hash)

    active subscription groups grouped based on consumer group in a hash



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/karafka/app.rb', line 32

def subscription_groups
  # We first build all the subscription groups, so they all get the same position, despite
  # later narrowing that. It allows us to maintain same position number for static members
  # even when we want to run subset of consumer groups or subscription groups
  #
  # We then narrow this to active consumer groups from which we select active subscription
  # groups.
  consumer_groups
    .map { |cg| [cg, cg.subscription_groups] }
    .select { |cg, _| cg.active? }
    .select { |_, sgs| sgs.delete_if { |sg| !sg.active? } }
    .delete_if { |_, sgs| sgs.empty? }
    .each { |_, sgs| sgs.each { |sg| sg.topics.delete_if { |top| !top.active? } } }
    .each { |_, sgs| sgs.delete_if { |sg| sg.topics.empty? } }
    .reject { |cg, _| cg.subscription_groups.empty? }
    .to_h
end

.warmupObject

Notifies the Ruby virtual machine that the boot sequence is finished, and that now is a good time to optimize the application. In case of older Ruby versions, runs compacting, which is part of the full warmup introduced in Ruby 3.3.



12
13
14
15
16
17
18
19
20
21
# File 'lib/karafka/app.rb', line 12

def warmup
  # Per recommendation, this should not run in children nodes
  return if Karafka::App.config.swarm.node

  monitor.instrument('app.before_warmup', caller: self)

  return GC.compact unless ::Process.respond_to?(:warmup)

  ::Process.warmup
end