Class: Deimos::Utils::DbProducer

Inherits:
Object
  • Object
show all
Includes:
Phobos::Producer
Defined in:
lib/deimos/utils/db_producer.rb

Overview

Class which continually polls the database and sends Kafka messages.

Constant Summary collapse

BATCH_SIZE =
1000

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger = Logger.new(STDOUT)) ⇒ DbProducer

Returns a new instance of DbProducer.

Parameters:

  • logger (Logger) (defaults to: Logger.new(STDOUT))


13
14
15
16
17
# File 'lib/deimos/utils/db_producer.rb', line 13

def initialize(logger=Logger.new(STDOUT))
  @id = SecureRandom.uuid
  @logger = logger
  @logger.push_tags("DbProducer #{@id}") if @logger.respond_to?(:push_tags)
end

Instance Attribute Details

#current_topicObject

Returns the value of attribute current_topic.



8
9
10
# File 'lib/deimos/utils/db_producer.rb', line 8

def current_topic
  @current_topic
end

#idObject

Returns the value of attribute id.



8
9
10
# File 'lib/deimos/utils/db_producer.rb', line 8

def id
  @id
end

Instance Method Details

#compact_messages(batch) ⇒ Array<Deimos::KafkaMessage>

Parameters:

Returns:



184
185
186
187
188
189
190
191
192
# File 'lib/deimos/utils/db_producer.rb', line 184

def compact_messages(batch)
  return batch unless batch.first&.key.present?

  topic = batch.first.topic
  return batch if config.compact_topics != :all &&
                  !config.compact_topics.include?(topic)

  batch.reverse.uniq!(&:key).reverse!
end

#configDeimos::DbProducerConfig

Returns:

  • (Deimos::DbProducerConfig)


20
21
22
# File 'lib/deimos/utils/db_producer.rb', line 20

def config
  Deimos.config.db_producer
end

#log_messages(messages) ⇒ Object

Parameters:



114
115
116
117
118
119
120
121
# File 'lib/deimos/utils/db_producer.rb', line 114

def log_messages(messages)
  return if config.log_topics != :all && !config.log_topics.include?(@current_topic)

  @logger.debug do
    decoded_messages = Deimos::KafkaMessage.decoded(messages)
    "DB producer: Topic #{@current_topic} Producing messages: #{decoded_messages}}"
  end
end

#process_next_messagesObject

Complete one loop of processing all messages in the DB.



46
47
48
49
50
51
# File 'lib/deimos/utils/db_producer.rb', line 46

def process_next_messages
  topics = retrieve_topics
  @logger.info("Found topics: #{topics}")
  topics.each(&method(:process_topic))
  sleep(0.5)
end

#process_topic(topic) ⇒ String

Returns the topic that was locked, or nil if none were.

Parameters:

  • topic (String)

Returns:

  • (String)

    the topic that was locked, or nil if none were.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/deimos/utils/db_producer.rb', line 60

def process_topic(topic)
  # If the topic is already locked, another producer is currently
  # working on it. Move on to the next one.
  unless KafkaTopicInfo.lock(topic, @id)
    @logger.debug("Could not lock topic #{topic} - continuing")
    return
  end
  @current_topic = topic

  loop { break unless process_topic_batch }

  KafkaTopicInfo.clear_lock(@current_topic, @id)
rescue StandardError => e
  @logger.error("Error processing messages for topic #{@current_topic}: #{e.class.name}: #{e.message} #{e.backtrace.join("\n")}")
  KafkaTopicInfo.register_error(@current_topic, @id)
end

#process_topic_batchObject

Process a single batch in a topic.



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/deimos/utils/db_producer.rb', line 78

def process_topic_batch
  messages = retrieve_messages
  return false if messages.empty?

  batch_size = messages.size
  compacted_messages = compact_messages(messages)
  log_messages(compacted_messages)
  Deimos.instrument('db_producer.produce', topic: @current_topic, messages: compacted_messages) do
    begin
      produce_messages(compacted_messages.map(&:phobos_message))
    rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge
      Deimos::KafkaMessage.where(id: messages.map(&:id)).delete_all
      @logger.error('Message batch too large, deleting...')
      @logger.error(Deimos::KafkaMessage.decoded(messages))
      raise
    end
  end
  Deimos::KafkaMessage.where(id: messages.map(&:id)).delete_all
  Deimos.config.metrics&.increment(
    'db_producer.process',
    tags: %W(topic:#{@current_topic}),
    by: messages.size
  )
  return false if batch_size < BATCH_SIZE

  KafkaTopicInfo.heartbeat(@current_topic, @id) # keep alive
  send_pending_metrics
  true
end

#produce_messages(batch) ⇒ Object

Parameters:

  • batch (Array<Hash>)


151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/deimos/utils/db_producer.rb', line 151

def produce_messages(batch)
  batch_size = batch.size
  begin
    batch.in_groups_of(batch_size, false).each do |group|
      @logger.debug("Publishing #{group.size} messages to #{@current_topic}")
      producer.publish_list(group)
      Deimos.config.metrics&.increment(
        'publish',
        tags: %W(status:success topic:#{@current_topic}),
        by: group.size
      )
      @logger.info("Sent #{group.size} messages to #{@current_topic}")
    end
  rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge,
         Kafka::RecordListTooLarge => e
    if batch_size == 1
      shutdown_producer
      raise
    end

    @logger.error("Got error #{e.class.name} when publishing #{batch.size} in groups of #{batch_size}, retrying...")
    if batch_size < 10
      batch_size = 1
    else
      batch_size /= 10
    end
    shutdown_producer
    retry
  end
end

#retrieve_messagesArray<Deimos::KafkaMessage>

Returns:



109
110
111
# File 'lib/deimos/utils/db_producer.rb', line 109

def retrieve_messages
  KafkaMessage.where(topic: @current_topic).order(:id).limit(BATCH_SIZE)
end

#retrieve_topicsArray<String>

Returns:

  • (Array<String>)


54
55
56
# File 'lib/deimos/utils/db_producer.rb', line 54

def retrieve_topics
  KafkaMessage.select('distinct topic').map(&:topic).uniq
end

#send_pending_metricsObject

Send metrics to Datadog.



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/deimos/utils/db_producer.rb', line 124

def send_pending_metrics
  metrics = Deimos.config.metrics
  return unless metrics

  messages = Deimos::KafkaMessage.
    select('count(*) as num_messages, min(created_at) as earliest, topic').
    group(:topic)
  if messages.none?
    metrics.gauge('pending_db_messages_max_wait', 0)
  end
  messages.each do |record|
    time_diff = Time.zone.now - record.earliest
    metrics.gauge('pending_db_messages_max_wait', time_diff,
                  tags: ["topic:#{record.topic}"])
  end
end

#shutdown_producerObject

Shut down the sync producer if we have to. Phobos will automatically create a new one. We should call this if the producer can be in a bad state and e.g. we need to clear the buffer.



144
145
146
147
148
# File 'lib/deimos/utils/db_producer.rb', line 144

def shutdown_producer
  if self.class.producer.respond_to?(:sync_producer_shutdown) # Phobos 1.8.3
    self.class.producer.sync_producer_shutdown
  end
end

#startObject

Start the poll.



25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/deimos/utils/db_producer.rb', line 25

def start
  @logger.info('Starting...')
  @signal_to_stop = false
  ActiveRecord::Base.connection.reconnect!
  loop do
    if @signal_to_stop
      @logger.info('Shutting down')
      break
    end
    send_pending_metrics
    process_next_messages
  end
end

#stopObject

Stop the poll.



40
41
42
43
# File 'lib/deimos/utils/db_producer.rb', line 40

def stop
  @logger.info('Received signal to stop')
  @signal_to_stop = true
end