Class: Deimos::Utils::DbProducer

Inherits:
Object
  • Object
show all
Includes:
Phobos::Producer
Defined in:
lib/deimos/utils/db_producer.rb

Overview

Class which continually polls the kafka_messages table in the database and sends Kafka messages.

Constant Summary collapse

BATCH_SIZE =

Returns:

  • (Integer)
1000
DELETE_BATCH_SIZE =

Returns:

  • (Integer)
10
MAX_DELETE_ATTEMPTS =

Returns:

  • (Integer)
3

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger = Logger.new(STDOUT)) ⇒ DbProducer

Returns a new instance of DbProducer.

Parameters:

  • logger (Logger) (defaults to: Logger.new(STDOUT))


19
20
21
22
23
# File 'lib/deimos/utils/db_producer.rb', line 19

def initialize(logger=Logger.new(STDOUT))
  @id = SecureRandom.uuid
  @logger = logger
  @logger.push_tags("DbProducer #{@id}") if @logger.respond_to?(:push_tags)
end

Instance Attribute Details

#current_topicObject

Returns the value of attribute current_topic.



9
10
11
# File 'lib/deimos/utils/db_producer.rb', line 9

def current_topic
  @current_topic
end

#idObject

Returns the value of attribute id.



9
10
11
# File 'lib/deimos/utils/db_producer.rb', line 9

def id
  @id
end

Instance Method Details

#compact_messages(batch) ⇒ Array<Deimos::KafkaMessage>

Parameters:

Returns:



244
245
246
247
248
249
250
251
252
# File 'lib/deimos/utils/db_producer.rb', line 244

def compact_messages(batch)
  return batch if batch.first&.key.blank?

  topic = batch.first.topic
  return batch if config.compact_topics != :all &&
                  !config.compact_topics.include?(topic)

  batch.reverse.uniq(&:key).reverse!
end

#configFigTree

Returns:

  • (FigTree)


26
27
28
# File 'lib/deimos/utils/db_producer.rb', line 26

def config
  Deimos.config.db_producer
end

#delete_messages(messages) ⇒ void

This method returns an undefined value.

Parameters:



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/deimos/utils/db_producer.rb', line 122

def delete_messages(messages)
  attempts = 1
  begin
    messages.in_groups_of(DELETE_BATCH_SIZE, false).each do |batch|
      Deimos::KafkaMessage.where(topic: batch.first.topic,
                                 id: batch.map(&:id)).
        delete_all
    end
  rescue StandardError => e
    if (e.message =~ /Lock wait/i || e.message =~ /Lost connection/i) &&
       attempts <= MAX_DELETE_ATTEMPTS
      attempts += 1
      ActiveRecord::Base.connection.verify!
      sleep(1)
      retry
    end
    raise
  end
end

#log_messages(messages) ⇒ void

This method returns an undefined value.

Parameters:



149
150
151
152
153
154
155
156
# File 'lib/deimos/utils/db_producer.rb', line 149

def log_messages(messages)
  return if config.log_topics != :all && !config.log_topics.include?(@current_topic)

  @logger.debug do
    decoded_messages = Deimos::KafkaMessage.decoded(messages)
    "DB producer: Topic #{@current_topic} Producing messages: #{decoded_messages}}"
  end
end

#process_next_messagesvoid

This method returns an undefined value.

Complete one loop of processing all messages in the DB.



55
56
57
58
59
60
61
# File 'lib/deimos/utils/db_producer.rb', line 55

def process_next_messages
  topics = retrieve_topics
  @logger.info("Found topics: #{topics}")
  topics.each(&method(:process_topic))
  KafkaTopicInfo.ping_empty_topics(topics)
  sleep(0.5)
end

#process_topic(topic) ⇒ String?

Returns the topic that was locked, or nil if none were.

Parameters:

  • topic (String)

Returns:

  • (String, nil)

    the topic that was locked, or nil if none were.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/deimos/utils/db_producer.rb', line 70

def process_topic(topic)
  # If the topic is already locked, another producer is currently
  # working on it. Move on to the next one.
  unless KafkaTopicInfo.lock(topic, @id)
    @logger.debug("Could not lock topic #{topic} - continuing")
    return
  end
  @current_topic = topic

  loop { break unless process_topic_batch }

  KafkaTopicInfo.clear_lock(@current_topic, @id)
rescue StandardError => e
  @logger.error("Error processing messages for topic #{@current_topic}: #{e.class.name}: #{e.message} #{e.backtrace.join("\n")}")
  KafkaTopicInfo.register_error(@current_topic, @id)
  shutdown_producer
end

#process_topic_batchvoid

This method returns an undefined value.

Process a single batch in a topic.



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/deimos/utils/db_producer.rb', line 90

def process_topic_batch
  messages = retrieve_messages
  return false if messages.empty?

  batch_size = messages.size
  compacted_messages = compact_messages(messages)
  log_messages(compacted_messages)
  Deimos.instrument('db_producer.produce', topic: @current_topic, messages: compacted_messages) do
    begin
      produce_messages(compacted_messages.map(&:phobos_message))
    rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge
      delete_messages(messages)
      @logger.error('Message batch too large, deleting...')
      @logger.error(Deimos::KafkaMessage.decoded(messages))
      raise
    end
  end
  delete_messages(messages)
  Deimos.config.metrics&.increment(
    'db_producer.process',
    tags: %W(topic:#{@current_topic}),
    by: messages.size
  )
  return false if batch_size < BATCH_SIZE

  KafkaTopicInfo.heartbeat(@current_topic, @id) # keep alive
  send_pending_metrics
  true
end

#produce_messages(batch) ⇒ void

This method returns an undefined value.

Produce messages in batches, reducing the size 1/10 if the batch is too large. Does not retry batches of messages that have already been sent.

Parameters:

  • batch (Array<Hash>)


209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/deimos/utils/db_producer.rb', line 209

def produce_messages(batch)
  batch_size = batch.size
  current_index = 0
  begin
    batch[current_index..-1].in_groups_of(batch_size, false).each do |group|
      @logger.debug("Publishing #{group.size} messages to #{@current_topic}")
      producer.publish_list(group)
      Deimos.config.metrics&.increment(
        'publish',
        tags: %W(status:success topic:#{@current_topic}),
        by: group.size
      )
      current_index += group.size
      @logger.info("Sent #{group.size} messages to #{@current_topic}")
    end
  rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge,
         Kafka::RecordListTooLarge => e
    if batch_size == 1
      shutdown_producer
      raise
    end

    @logger.error("Got error #{e.class.name} when publishing #{batch.size} in groups of #{batch_size}, retrying...")
    batch_size = if batch_size < 10
                   1
                 else
                   (batch_size / 10).to_i
                 end
    shutdown_producer
    retry
  end
end

#retrieve_messagesArray<Deimos::KafkaMessage>

Returns:



143
144
145
# File 'lib/deimos/utils/db_producer.rb', line 143

def retrieve_messages
  KafkaMessage.where(topic: @current_topic).order(:id).limit(BATCH_SIZE)
end

#retrieve_topicsArray<String>

Returns:

  • (Array<String>)


64
65
66
# File 'lib/deimos/utils/db_producer.rb', line 64

def retrieve_topics
  KafkaMessage.select('distinct topic').map(&:topic).uniq
end

#send_pending_metricsvoid

This method returns an undefined value.

Send metrics related to pending messages.



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/deimos/utils/db_producer.rb', line 160

def send_pending_metrics
  metrics = Deimos.config.metrics
  return unless metrics

  topics = KafkaTopicInfo.select(%w(topic last_processed_at))
  messages = Deimos::KafkaMessage.
    select('count(*) as num_messages, min(created_at) as earliest, topic').
    group(:topic).
    index_by(&:topic)
  topics.each do |record|
    message_record = messages[record.topic]
    # We want to record the last time we saw any activity, meaning either
    # the oldest message, or the last time we processed, whichever comes
    # last.
    if message_record
      record_earliest = message_record.earliest
      # SQLite gives a string here
      if record_earliest.is_a?(String)
        record_earliest = Time.zone.parse(record_earliest)
      end

      earliest = [record.last_processed_at, record_earliest].max
      time_diff = Time.zone.now - earliest
      metrics.gauge('pending_db_messages_max_wait', time_diff,
                    tags: ["topic:#{record.topic}"])
    else
      # no messages waiting
      metrics.gauge('pending_db_messages_max_wait', 0,
                    tags: ["topic:#{record.topic}"])
    end
    metrics.gauge('pending_db_messages_count', message_record&.num_messages || 0,
                  tags: ["topic:#{record.topic}"])
  end
end

#shutdown_producervoid

This method returns an undefined value.

Shut down the sync producer if we have to. Phobos will automatically create a new one. We should call this if the producer can be in a bad state and e.g. we need to clear the buffer.



199
200
201
202
203
# File 'lib/deimos/utils/db_producer.rb', line 199

def shutdown_producer
  if self.class.producer.respond_to?(:sync_producer_shutdown) # Phobos 1.8.3
    self.class.producer.sync_producer_shutdown
  end
end

#startvoid

This method returns an undefined value.

Start the poll.



32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/deimos/utils/db_producer.rb', line 32

def start
  @logger.info('Starting...')
  @signal_to_stop = false
  ActiveRecord::Base.connection.reconnect!
  loop do
    if @signal_to_stop
      @logger.info('Shutting down')
      break
    end
    send_pending_metrics
    process_next_messages
  end
end

#stopvoid

This method returns an undefined value.

Stop the poll.



48
49
50
51
# File 'lib/deimos/utils/db_producer.rb', line 48

def stop
  @logger.info('Received signal to stop')
  @signal_to_stop = true
end