Module: Deimos::ActiveRecordConsume::BatchConsumption

Included in:
Deimos::ActiveRecordConsumer
Defined in:
lib/deimos/active_record_consume/batch_consumption.rb

Overview

Methods for consuming batches of messages and saving them to the database in bulk ActiveRecord operations.

Instance Method Summary collapse

Instance Method Details

#consume_batch(payloads, metadata) ⇒ void

This method returns an undefined value.

Handle a batch of Kafka messages. Batches are split into “slices”, which are groups of independent messages that can be processed together in a single database operation. If two messages in a batch have the same key, we cannot process them in the same operation as they would interfere with each other. Thus they are split

Parameters:



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/deimos/active_record_consume/batch_consumption.rb', line 26

def consume_batch(payloads, )
  messages = payloads.
    zip([:keys]).
    map { |p, k| Deimos::Message.new(p, nil, key: k) }

  tag = [:topic]
  Deimos.config.tracer.active_span.set_tag('topic', tag)

  Deimos.instrument('ar_consumer.consume_batch', tag) do
    if @compacted || self.class.config[:no_keys]
      update_database(compact_messages(messages))
    else
      uncompacted_update(messages)
    end
  end
end