Class: Deimos::Utils::DbPoller::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/deimos/utils/db_poller/base.rb

Overview

Base poller class for retrieving and publishing messages.

Direct Known Subclasses

StateBased, TimeBased

Constant Summary collapse

BATCH_SIZE =

Returns:

  • (Integer)
1000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Base

Returns a new instance of Base.

Parameters:



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/deimos/utils/db_poller/base.rb', line 32

def initialize(config)
  @config = config
  @id = SecureRandom.hex
  begin
    if @config.poller_class.nil? && @config.producer_class.nil?
      raise 'No producers have been set for this DB poller!'
    end

    @resource_class = self.class.producers.any? ? self.class : @config.producer_class.constantize

    producer_classes.each do |producer_class|
      validate_producer_class(producer_class)
    end
  rescue NameError
    raise "Class #{@config.producer_class} not found!"
  end
end

Instance Attribute Details

#configHash (readonly)

Returns:

  • (Hash)


22
23
24
# File 'lib/deimos/utils/db_poller/base.rb', line 22

def config
  @config
end

#idInteger (readonly)

Needed for Executor so it can identify the worker

Returns:

  • (Integer)


19
20
21
# File 'lib/deimos/utils/db_poller/base.rb', line 19

def id
  @id
end

Class Method Details

.producersArray<Producer>

Method to define producers if a single poller needs to publish to multiple topics. Producer classes should be constantized

Returns:



27
28
29
# File 'lib/deimos/utils/db_poller/base.rb', line 27

def self.producers
  []
end

Instance Method Details

#create_poll_infoDeimos::PollInfo

Returns:



84
85
86
# File 'lib/deimos/utils/db_poller/base.rb', line 84

def create_poll_info
  Deimos::PollInfo.create!(producer: @resource_class.to_s, last_sent: Time.new(0))
end

#handle_message_too_large(exception, batch, status, span) ⇒ Boolean

Parameters:

  • exception (Exception)
  • batch (Array<ActiveRecord::Base>)
  • status (PollStatus)
  • span (Object)

Returns:

  • (Boolean)


113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/deimos/utils/db_poller/base.rb', line 113

def handle_message_too_large(exception, batch, status, span)
  Deimos.config.logger.error("Error publishing through DB Poller: #{exception.message}")
  if @config.skip_too_large_messages
    Deimos.config.logger.error("Skipping messages #{batch.map(&:id).join(', ')} since they are too large")
    Deimos.config.tracer&.set_error(span, exception)
    status.batches_errored += 1
    true
  else # do the same thing as regular Kafka::Error
    sleep(0.5)
    false
  end
end

#log_identifierString

Configure log identifier and messages to be used in subclasses

Returns:

  • (String)


177
178
179
# File 'lib/deimos/utils/db_poller/base.rb', line 177

def log_identifier
  "#{@resource_class.name}: #{producer_classes.map(&:topic)}"
end

#process_batch(batch) ⇒ void

This method returns an undefined value.

Publish batch using the configured producers

Parameters:

  • batch (Array<ActiveRecord::Base>)


169
170
171
172
173
# File 'lib/deimos/utils/db_poller/base.rb', line 169

def process_batch(batch)
  producer_classes.each do |producer|
    producer.send_events(batch)
  end
end

#process_batch_with_span(batch, status) ⇒ Boolean

rubocop:disable Metrics/AbcSize

Parameters:

  • batch (Array<ActiveRecord::Base>)
  • status (PollStatus)

Returns:

  • (Boolean)


130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/deimos/utils/db_poller/base.rb', line 130

def process_batch_with_span(batch, status)
  retries = 0
  begin
    span = Deimos.config.tracer&.start(
      'deimos-db-poller',
      resource: @resource_class.name.gsub('::', '-')
    )
    process_batch(batch)
    Deimos.config.tracer&.finish(span)
    status.batches_processed += 1
  rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge,
         Kafka::RecordListTooLarge => e
    retry unless handle_message_too_large(e, batch, status, span)
  rescue Kafka::Error => e # keep trying till it fixes itself
    Deimos.config.logger.error("Error publishing through DB Poller: #{e.message}")
    sleep(0.5)
    retry
  rescue StandardError => e
    Deimos.config.logger.error("Error publishing through DB poller: #{e.message}}")
    if @config.retries.nil? || retries < @config.retries
      retries += 1
      sleep(0.5)
      retry
    else
      Deimos.config.logger.error('Retries exceeded, moving on to next batch')
      Deimos.config.tracer&.set_error(span, e)
      status.batches_errored += 1
      return false
    end
  ensure
    status.messages_processed += batch.size
  end
  true
end

#process_updatesvoid

This method returns an undefined value.

Send messages for updated data.



104
105
106
# File 'lib/deimos/utils/db_poller/base.rb', line 104

def process_updates
  raise Deimos::MissingImplementationError
end

#producer_classesArray<ActiveRecordProducer>

Return array of configured producers depending on poller class

Returns:



183
184
185
186
187
# File 'lib/deimos/utils/db_poller/base.rb', line 183

def producer_classes
  return self.class.producers if self.class.producers.any?

  [@config.producer_class.constantize]
end

#retrieve_poll_infovoid

Grab the PollInfo or create if it doesn’t exist.

Returns:

  • (void)
  • (void)


79
80
81
# File 'lib/deimos/utils/db_poller/base.rb', line 79

def retrieve_poll_info
  @info = Deimos::PollInfo.find_by_producer(@resource_class.to_s) || create_poll_info
end

#should_run?Boolean

Indicate whether this current loop should process updates. Most loops will busy-wait (sleeping 0.1 seconds) until it’s ready.

Returns:

  • (Boolean)


91
92
93
# File 'lib/deimos/utils/db_poller/base.rb', line 91

def should_run?
  Time.zone.now - @info.last_sent - @config.delay_time >= @config.run_every
end

#startvoid

This method returns an undefined value.

Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/deimos/utils/db_poller/base.rb', line 56

def start
  # Don't send asynchronously
  if Deimos.config.producers.backend == :kafka_async
    Deimos.config.producers.backend = :kafka
  end
  Deimos.config.logger.info('Starting...')
  @signal_to_stop = false
  ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive?

  retrieve_poll_info
  loop do
    if @signal_to_stop
      Deimos.config.logger.info('Shutting down')
      break
    end
    process_updates if should_run?
    sleep(0.1)
  end
end

#stopvoid

This method returns an undefined value.

Stop the poll.



97
98
99
100
# File 'lib/deimos/utils/db_poller/base.rb', line 97

def stop
  Deimos.config.logger.info('Received signal to stop')
  @signal_to_stop = true
end

#validate_producer_class(producer_class) ⇒ void

This method returns an undefined value.

Validate if a producer class is an ActiveRecordProducer or not



191
192
193
194
195
# File 'lib/deimos/utils/db_poller/base.rb', line 191

def validate_producer_class(producer_class)
  unless producer_class < Deimos::ActiveRecordProducer
    raise "Class #{producer_class.class.name} is not an ActiveRecordProducer!"
  end
end