Class: Deimos::Utils::DbPoller::StateBased
- Defined in:
- lib/deimos/utils/db_poller/state_based.rb
Overview
Poller that uses state columns to determine the records to publish.
Constant Summary
Constants inherited from Base
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
- #fetch_results ⇒ ActiveRecord::Relation
- #finalize_batch(batch, success) ⇒ void
-
#process_updates ⇒ void
Send messages for updated data.
Methods inherited from Base
#create_poll_info, #handle_message_too_large, #initialize, #log_identifier, #process_batch, #process_batch_with_span, #producer_classes, producers, #retrieve_poll_info, #should_run?, #start, #stop, #validate_producer_class
Constructor Details
This class inherits a constructor from Deimos::Utils::DbPoller::Base
Instance Method Details
#fetch_results ⇒ ActiveRecord::Relation
36 37 38 |
# File 'lib/deimos/utils/db_poller/state_based.rb', line 36 def fetch_results @resource_class.poll_query.limit(BATCH_SIZE).order(@config.) end |
#finalize_batch(batch, success) ⇒ void
This method returns an undefined value.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/deimos/utils/db_poller/state_based.rb', line 43 def finalize_batch(batch, success) @info.touch(:last_sent) state = success ? @config.published_state : @config.failed_state klass = batch.first.class id_col = klass.primary_key.to_sym = @config. attrs = { => Time.zone.now } attrs[@config.state_column] = state if state if @config. attrs[@config.] = Time.zone.now end klass.where(id_col => batch.map(&id_col)).update_all(attrs) end |
#process_updates ⇒ void
This method returns an undefined value.
Send messages for updated data.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/deimos/utils/db_poller/state_based.rb', line 12 def process_updates Deimos.config.logger.info("Polling #{log_identifier}") status = PollStatus.new(0, 0, 0) first_batch = true # poll_query gets all the relevant data from the database, as defined # by the producer itself. loop do Deimos.config.logger.debug("Polling #{log_identifier}, batch #{status.current_batch}") batch = fetch_results.to_a break if batch.empty? first_batch = false success = process_batch_with_span(batch, status) finalize_batch(batch, success) end # If there were no results at all, we update last_sent so that we still get a wait # before the next poll. @info.touch(:last_sent) if first_batch Deimos.config.logger.info("Poll #{log_identifier} complete (#{status.report}") end |