Class: Dionysus::Consumer::KarafkaConsumerGenerator

Inherits:
Object
  • Object
show all
Defined in:
lib/dionysus/consumer/karafka_consumer_generator.rb

Constant Summary collapse

TOMBSTONE =
nil

Instance Method Summary collapse

Instance Method Details

#generate(config, topic) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/dionysus/consumer/karafka_consumer_generator.rb', line 6

def generate(config, topic)
  base_class = topic.consumer_base_class || config.consumer_base_class

  consumer_klass = Class.new(base_class) do
    define_method :consume do
      config.retry_provider.retry do
        processed_events = Concurrent::Array.new
        config.instrumenter.instrument("dionysus.consume.#{topic}") do
          batch_number = 0

          if topic.concurrency
            workers = Dionysus::Consumer::WorkersGroup.new
            messages.each do |batch|
              batch_number += 1 # cannot use each_with_index on params_batch
              worker = Thread.new do
                Thread.current.report_on_exception = true
                Thread.current.abort_on_exception = true
                processed_events.concat(process_batch(config, topic, batch, batch_number))
              end
              workers << worker
            end
            workers.work
          else
            final_params_batch = topic.params_batch_transformation&.call(messages) || messages
            final_params_batch.each do |batch|
              batch_number += 1 # cannot use each_with_index on params_batch
              processed_events.concat(process_batch(config, topic, batch, batch_number))
            end
          end
        end
        Dionysus::Consumer::BatchEventsPublisher.new(config, topic).publish(processed_events)
      end
    end

    private

    define_method :process_batch do |configuration, current_topic, batch, batch_number|
      configuration.transaction_provider.transaction do
        Dionysus::Consumer::ParamsBatchProcessor.new(configuration, current_topic).process(batch,
          batch_number)
      end
    end
  end

  consumer_klass_name = "#{topic.to_s.classify}Consumer"

  Dionysus.send(:remove_const, consumer_klass_name) if Dionysus.const_defined?(consumer_klass_name)
  Dionysus.const_set(consumer_klass_name, consumer_klass)
  consumer_klass
end