Class: Deimos::Utils::DbPoller::Base
- Inherits:
-
Object
- Object
- Deimos::Utils::DbPoller::Base
- Defined in:
- lib/deimos/utils/db_poller/base.rb
Overview
Base poller class for retrieving and publishing messages.
Direct Known Subclasses
Constant Summary collapse
- BATCH_SIZE =
1000
Instance Attribute Summary collapse
- #config ⇒ Hash readonly
-
#id ⇒ Integer
readonly
Needed for Executor so it can identify the worker.
Class Method Summary collapse
-
.producers ⇒ Array<Producer>
Method to define producers if a single poller needs to publish to multiple topics.
Instance Method Summary collapse
- #create_poll_info ⇒ Deimos::PollInfo
- #handle_message_too_large(exception, batch, status, span) ⇒ Boolean
-
#initialize(config) ⇒ Base
constructor
A new instance of Base.
-
#log_identifier ⇒ String
Configure log identifier and messages to be used in subclasses.
-
#process_batch(batch) ⇒ void
Publish batch using the configured producers.
-
#process_batch_with_span(batch, status) ⇒ Boolean
rubocop:disable Metrics/AbcSize.
-
#process_updates ⇒ void
Send messages for updated data.
-
#producer_classes ⇒ Array<ActiveRecordProducer>
Return array of configured producers depending on poller class.
-
#retrieve_poll_info ⇒ void
Grab the PollInfo or create if it doesn’t exist.
-
#should_run? ⇒ Boolean
Indicate whether this current loop should process updates.
-
#start ⇒ void
Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.
-
#stop ⇒ void
Stop the poll.
-
#validate_producer_class(producer_class) ⇒ void
Validate if a producer class is an ActiveRecordProducer or not.
Constructor Details
#initialize(config) ⇒ Base
Returns a new instance of Base.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/deimos/utils/db_poller/base.rb', line 32 def initialize(config) @config = config @id = SecureRandom.hex begin if @config.poller_class.nil? && @config.producer_class.nil? raise 'No producers have been set for this DB poller!' end @resource_class = self.class.producers.any? ? self.class : @config.producer_class.constantize producer_classes.each do |producer_class| validate_producer_class(producer_class) end rescue NameError raise "Class #{@config.producer_class} not found!" end end |
Instance Attribute Details
#config ⇒ Hash (readonly)
22 23 24 |
# File 'lib/deimos/utils/db_poller/base.rb', line 22 def config @config end |
#id ⇒ Integer (readonly)
Needed for Executor so it can identify the worker
19 20 21 |
# File 'lib/deimos/utils/db_poller/base.rb', line 19 def id @id end |
Class Method Details
.producers ⇒ Array<Producer>
Method to define producers if a single poller needs to publish to multiple topics. Producer classes should be constantized
27 28 29 |
# File 'lib/deimos/utils/db_poller/base.rb', line 27 def self.producers [] end |
Instance Method Details
#create_poll_info ⇒ Deimos::PollInfo
84 85 86 |
# File 'lib/deimos/utils/db_poller/base.rb', line 84 def create_poll_info Deimos::PollInfo.create!(producer: @resource_class.to_s, last_sent: Time.new(0)) end |
#handle_message_too_large(exception, batch, status, span) ⇒ Boolean
113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/deimos/utils/db_poller/base.rb', line 113 def (exception, batch, status, span) Deimos.config.logger.error("Error publishing through DB Poller: #{exception.}") if @config. Deimos.config.logger.error("Skipping messages #{batch.map(&:id).join(', ')} since they are too large") Deimos.config.tracer&.set_error(span, exception) status.batches_errored += 1 true else # do the same thing as regular Kafka::Error sleep(0.5) false end end |
#log_identifier ⇒ String
Configure log identifier and messages to be used in subclasses
177 178 179 |
# File 'lib/deimos/utils/db_poller/base.rb', line 177 def log_identifier "#{@resource_class.name}: #{producer_classes.map(&:topic)}" end |
#process_batch(batch) ⇒ void
This method returns an undefined value.
Publish batch using the configured producers
169 170 171 172 173 |
# File 'lib/deimos/utils/db_poller/base.rb', line 169 def process_batch(batch) producer_classes.each do |producer| producer.send_events(batch) end end |
#process_batch_with_span(batch, status) ⇒ Boolean
rubocop:disable Metrics/AbcSize
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/deimos/utils/db_poller/base.rb', line 130 def process_batch_with_span(batch, status) retries = 0 begin span = Deimos.config.tracer&.start( 'deimos-db-poller', resource: @resource_class.name.gsub('::', '-') ) process_batch(batch) Deimos.config.tracer&.finish(span) status.batches_processed += 1 rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge => e retry unless (e, batch, status, span) rescue Kafka::Error => e # keep trying till it fixes itself Deimos.config.logger.error("Error publishing through DB Poller: #{e.}") sleep(0.5) retry rescue StandardError => e Deimos.config.logger.error("Error publishing through DB poller: #{e.}}") if @config.retries.nil? || retries < @config.retries retries += 1 sleep(0.5) retry else Deimos.config.logger.error('Retries exceeded, moving on to next batch') Deimos.config.tracer&.set_error(span, e) status.batches_errored += 1 return false end ensure status. += batch.size end true end |
#process_updates ⇒ void
This method returns an undefined value.
Send messages for updated data.
104 105 106 |
# File 'lib/deimos/utils/db_poller/base.rb', line 104 def process_updates raise Deimos::MissingImplementationError end |
#producer_classes ⇒ Array<ActiveRecordProducer>
Return array of configured producers depending on poller class
183 184 185 186 187 |
# File 'lib/deimos/utils/db_poller/base.rb', line 183 def producer_classes return self.class.producers if self.class.producers.any? [@config.producer_class.constantize] end |
#retrieve_poll_info ⇒ void
Grab the PollInfo or create if it doesn’t exist.
79 80 81 |
# File 'lib/deimos/utils/db_poller/base.rb', line 79 def retrieve_poll_info @info = Deimos::PollInfo.find_by_producer(@resource_class.to_s) || create_poll_info end |
#should_run? ⇒ Boolean
Indicate whether this current loop should process updates. Most loops will busy-wait (sleeping 0.1 seconds) until it’s ready.
91 92 93 |
# File 'lib/deimos/utils/db_poller/base.rb', line 91 def should_run? Time.zone.now - @info.last_sent - @config.delay_time >= @config.run_every end |
#start ⇒ void
This method returns an undefined value.
Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/deimos/utils/db_poller/base.rb', line 56 def start # Don't send asynchronously if Deimos.config.producers.backend == :kafka_async Deimos.config.producers.backend = :kafka end Deimos.config.logger.info('Starting...') @signal_to_stop = false ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive? retrieve_poll_info loop do if @signal_to_stop Deimos.config.logger.info('Shutting down') break end process_updates if should_run? sleep(0.1) end end |
#stop ⇒ void
This method returns an undefined value.
Stop the poll.
97 98 99 100 |
# File 'lib/deimos/utils/db_poller/base.rb', line 97 def stop Deimos.config.logger.info('Received signal to stop') @signal_to_stop = true end |
#validate_producer_class(producer_class) ⇒ void
This method returns an undefined value.
Validate if a producer class is an ActiveRecordProducer or not
191 192 193 194 195 |
# File 'lib/deimos/utils/db_poller/base.rb', line 191 def validate_producer_class(producer_class) unless producer_class < Deimos::ActiveRecordProducer raise "Class #{producer_class.class.name} is not an ActiveRecordProducer!" end end |