Class: Rimless::ConsumerApp

Inherits:
Karafka::App
  • Object
show all
Defined in:
lib/rimless/consumer.rb

Overview

The global rimless Apache Kafka consumer application based on the Karafka framework.

rubocop:disable Style/ClassVars because we just work as a singleton

Constant Summary collapse

@@rimless_initialized =

We track our own initialization with this class variable

false

Class Method Summary collapse

Class Method Details

.configure(&block) ⇒ Rimless::ConsumerApp

Allows the user to re-configure the Karafka application if this is needed. (eg. to set some ruby-kafka driver default settings, etc)

Returns:



117
118
119
120
# File 'lib/rimless/consumer.rb', line 117

def configure(&block)
  setup(&block)
  self
end

.initialize!Rimless::ConsumerApp

Initialize the Karafka framework and our global consumer application with all our conventions/opinions.

Returns:



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/rimless/consumer.rb', line 17

def initialize!
  # When already initialized, skip it
  return self if @@rimless_initialized

  # Initialize all the parts one by one
  initialize_rails!
  initialize_monitors!
  initialize_karafka!
  initialize_logger!
  initialize_code_reload!

  # Load the custom Karafka boot file when it exists, it contains
  # custom configurations and the topic/consumer routing table
  require ::Karafka.boot_file if ::Karafka.boot_file.exist?

  # Set our custom initialization process as completed to
  # skip subsequent calls
  @@rimless_initialized = true
  self
end

.initialize_code_reload!Object

Perform code hot-reloading when we are in Rails and in development mode.



105
106
107
108
109
110
111
# File 'lib/rimless/consumer.rb', line 105

def initialize_code_reload!
  return unless defined?(Rails) && Rails.env.development?

  ::Karafka.monitor.subscribe(::Karafka::CodeReloader.new(
                                *Rails.application.reloaders
                              ))
end

.initialize_karafka!Object

Configure the pure basics on the Karafka application.

rubocop:disable Metrics/MethodLength because of the various settings rubocop:disable Metrics/AbcSize dito



71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/rimless/consumer.rb', line 71

def initialize_karafka!
  setup do |config|
    mapper = Rimless::Karafka::PassthroughMapper.new
    config.consumer_mapper = config.topic_mapper = mapper
    config.deserializer = Rimless::Karafka::AvroDeserializer.new
    config.kafka.seed_brokers = Rimless.configuration.kafka_brokers
    config.client_id = Rimless.configuration.client_id
    config.logger = Rimless.logger
    config.backend = :sidekiq
    config.batch_fetching = true
    config.batch_consuming = false
    config.shutdown_timeout = 10
  end
end

.initialize_logger!Object

When we run in development mode, we want to write the logs to file and to stdout.



90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/rimless/consumer.rb', line 90

def initialize_logger!
  # Skip when configured not to extend the logger
  return unless Rimless.configuration.extend_dev_logger

  # Skip when not in development environment or in the server process
  return unless Rimless.env.development? && server?

  $stdout.sync = true
  Rimless.logger.extend(ActiveSupport::Logger.broadcast(
                          ActiveSupport::Logger.new($stdout)
                        ))
end

.initialize_monitors!Object

We like to listen to instrumentation and logging events to allow our users to handle them like they need.



57
58
59
60
61
62
63
64
65
# File 'lib/rimless/consumer.rb', line 57

def initialize_monitors!
  [
    WaterDrop::Instrumentation::StdoutListener.new,
    ::Karafka::Instrumentation::StdoutListener.new,
    ::Karafka::Instrumentation::ProctitleListener.new
  ].each do |listener|
    ::Karafka.monitor.subscribe(listener)
  end
end

.initialize_rails!Object

Check if Rails is available and not already initialized, then initialize it.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/rimless/consumer.rb', line 40

def initialize_rails!
  rails_env = ::Karafka.root.join('config', 'environment.rb')

  # Stop, when Rails is already initialized
  return if defined? Rails

  # Stop, when there is no Rails at all
  return unless rails_env.exist?

  ENV['RAILS_ENV'] ||= 'development'
  ENV['KARAFKA_ENV'] = ENV.fetch('RAILS_ENV', nil)
  require rails_env
  Rails.application.eager_load!
end

.server?Boolean

Check if we run as the Karafka server (consumer) process or not.

Returns:

  • (Boolean)

    whenever we run as the Karafka server or not



185
186
187
# File 'lib/rimless/consumer.rb', line 185

def server?
  $PROGRAM_NAME.end_with?('karafka') && ARGV.include?('server')
end

.topic_names(parts) ⇒ Array<String>

Build the conventional Apache Kafka topic names from the given parts. This allows various forms like single strings/symbols and a hash in the form of +{ app: [String, Symbol], name: [String, Symbol], names:

Array<String, Symbol>

}+. This allows the maximum of flexibility.

Parameters:

  • parts (String, Symbol, Hash{Symbol => Mixed})

    the topic name parts

Returns:

  • (Array<String>)

    the full topic names



170
171
172
173
174
175
176
177
178
179
180
# File 'lib/rimless/consumer.rb', line 170

def topic_names(parts)
  # We have a single app, but multiple names so we handle them
  if parts.is_a?(Hash) && parts.key?(:names)
    return parts[:names].map do |name|
      Rimless.topic(parts.merge(name: name))
    end
  end

  # We cannot handle the given input
  [Rimless.topic(parts)]
end

.topics(topics = []) { ... } ⇒ Object

Configure the topics-consumer routing table in a lean way.

Examples:

topics({ app: :test_app, name: :admins } => YourConsumer)
topics({ app: :test_app, names: %i[users admins] } => YourConsumer)

Examples:

topics do
  topic('name') do
    consumer CustomConsumer
  end
end

rubocop:disable Metrics/MethodLength because of the Karafka DSL

Parameters:

  • topics (Hash{Hash => Class}) (defaults to: [])

    the topic to consumer mapping

Yields:

  • the given block on the routing table



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/rimless/consumer.rb', line 141

def topics(topics = [], &block)
  consumer_groups.draw do
    consumer_group(Rimless.configuration.client_id) do
      instance_exec(&block) if block_given?

      topics.each do |topic_parts, dest_consumer|
        Rimless.consumer.topic_names(topic_parts).each do |topic_name|
          topic(topic_name) do
            consumer dest_consumer
            worker Rimless::ConsumerJob
            interchanger Rimless::Karafka::Base64Interchanger.new
          end
        end
      end
    end
  end

  self
end