Class: Rimless::ConsumerApp
- Inherits:
-
Karafka::App
- Object
- Karafka::App
- Rimless::ConsumerApp
- Defined in:
- lib/rimless/consumer.rb
Overview
The global rimless Apache Kafka consumer application based on the Karafka framework.
rubocop:disable Style/ClassVars because we just work as a singleton
Constant Summary collapse
- @@rimless_initialized =
We track our own initialization with this class variable
false
Class Method Summary collapse
-
.configure(&block) ⇒ Rimless::ConsumerApp
Allows the user to re-configure the Karafka application if this is needed.
-
.initialize! ⇒ Rimless::ConsumerApp
Initialize the Karafka framework and our global consumer application with all our conventions/opinions.
-
.initialize_code_reload! ⇒ Object
Perform code hot-reloading when we are in Rails and in development mode.
-
.initialize_karafka! ⇒ Object
Configure the pure basics on the Karafka application.
-
.initialize_logger! ⇒ Object
When we run in development mode, we want to write the logs to file and to stdout.
-
.initialize_monitors! ⇒ Object
We like to listen to instrumentation and logging events to allow our users to handle them like they need.
-
.initialize_rails! ⇒ Object
Check if Rails is available and not already initialized, then initialize it.
-
.server? ⇒ Boolean
Check if we run as the Karafka server (consumer) process or not.
-
.topic_names(parts) ⇒ Array<String>
Build the conventional Apache Kafka topic names from the given parts.
-
.topics(topics = []) { ... } ⇒ Object
Configure the topics-consumer routing table in a lean way.
Class Method Details
.configure(&block) ⇒ Rimless::ConsumerApp
Allows the user to re-configure the Karafka application if this is needed. (eg. to set some ruby-kafka driver default settings, etc)
117 118 119 120 |
# File 'lib/rimless/consumer.rb', line 117 def configure(&block) setup(&block) self end |
.initialize! ⇒ Rimless::ConsumerApp
Initialize the Karafka framework and our global consumer application with all our conventions/opinions.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/rimless/consumer.rb', line 17 def initialize! # When already initialized, skip it return self if @@rimless_initialized # Initialize all the parts one by one initialize_rails! initialize_monitors! initialize_karafka! initialize_logger! initialize_code_reload! # Load the custom Karafka boot file when it exists, it contains # custom configurations and the topic/consumer routing table require ::Karafka.boot_file if ::Karafka.boot_file.exist? # Set our custom initialization process as completed to # skip subsequent calls @@rimless_initialized = true self end |
.initialize_code_reload! ⇒ Object
Perform code hot-reloading when we are in Rails and in development mode.
105 106 107 108 109 110 111 |
# File 'lib/rimless/consumer.rb', line 105 def initialize_code_reload! return unless defined?(Rails) && Rails.env.development? ::Karafka.monitor.subscribe(::Karafka::CodeReloader.new( *Rails.application.reloaders )) end |
.initialize_karafka! ⇒ Object
Configure the pure basics on the Karafka application.
rubocop:disable Metrics/MethodLength because of the various settings rubocop:disable Metrics/AbcSize dito
71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/rimless/consumer.rb', line 71 def initialize_karafka! setup do |config| mapper = Rimless::Karafka::PassthroughMapper.new config.consumer_mapper = config.topic_mapper = mapper config.deserializer = Rimless::Karafka::AvroDeserializer.new config.kafka.seed_brokers = Rimless.configuration.kafka_brokers config.client_id = Rimless.configuration.client_id config.logger = Rimless.logger config.backend = :sidekiq config.batch_fetching = true config.batch_consuming = false config.shutdown_timeout = 10 end end |
.initialize_logger! ⇒ Object
When we run in development mode, we want to write the logs to file and to stdout.
90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/rimless/consumer.rb', line 90 def initialize_logger! # Skip when configured not to extend the logger return unless Rimless.configuration.extend_dev_logger # Skip when not in development environment or in the server process return unless Rimless.env.development? && server? $stdout.sync = true Rimless.logger.extend(ActiveSupport::Logger.broadcast( ActiveSupport::Logger.new($stdout) )) end |
.initialize_monitors! ⇒ Object
We like to listen to instrumentation and logging events to allow our users to handle them like they need.
57 58 59 60 61 62 63 64 65 |
# File 'lib/rimless/consumer.rb', line 57 def initialize_monitors! [ WaterDrop::Instrumentation::StdoutListener.new, ::Karafka::Instrumentation::StdoutListener.new, ::Karafka::Instrumentation::ProctitleListener.new ].each do |listener| ::Karafka.monitor.subscribe(listener) end end |
.initialize_rails! ⇒ Object
Check if Rails is available and not already initialized, then initialize it.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/rimless/consumer.rb', line 40 def initialize_rails! rails_env = ::Karafka.root.join('config', 'environment.rb') # Stop, when Rails is already initialized return if defined? Rails # Stop, when there is no Rails at all return unless rails_env.exist? ENV['RAILS_ENV'] ||= 'development' ENV['KARAFKA_ENV'] = ENV.fetch('RAILS_ENV', nil) require rails_env Rails.application.eager_load! end |
.server? ⇒ Boolean
Check if we run as the Karafka server (consumer) process or not.
185 186 187 |
# File 'lib/rimless/consumer.rb', line 185 def server? $PROGRAM_NAME.end_with?('karafka') && ARGV.include?('server') end |
.topic_names(parts) ⇒ Array<String>
Build the conventional Apache Kafka topic names from the given parts. This allows various forms like single strings/symbols and a hash in the form of +{ app: [String, Symbol], name: [String, Symbol], names:
- Array<String, Symbol>
-
}+. This allows the maximum of flexibility.
170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/rimless/consumer.rb', line 170 def topic_names(parts) # We have a single app, but multiple names so we handle them if parts.is_a?(Hash) && parts.key?(:names) return parts[:names].map do |name| Rimless.topic(parts.merge(name: name)) end end # We cannot handle the given input [Rimless.topic(parts)] end |
.topics(topics = []) { ... } ⇒ Object
Configure the topics-consumer routing table in a lean way.
Examples:
topics({ app: :test_app, name: :admins } => YourConsumer)
topics({ app: :test_app, names: %i[users admins] } => YourConsumer)
Examples:
topics do
topic('name') do
consumer CustomConsumer
end
end
rubocop:disable Metrics/MethodLength because of the Karafka DSL
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/rimless/consumer.rb', line 141 def topics(topics = [], &block) consumer_groups.draw do consumer_group(Rimless.configuration.client_id) do instance_exec(&block) if block_given? topics.each do |topic_parts, dest_consumer| Rimless.consumer.topic_names(topic_parts).each do |topic_name| topic(topic_name) do consumer dest_consumer worker Rimless::ConsumerJob interchanger Rimless::Karafka::Base64Interchanger.new end end end end end self end |