Class: Deimos::Utils::DbPoller::TimeBased
- Defined in:
- lib/deimos/utils/db_poller/time_based.rb
Overview
Poller that uses ID and updated_at to determine the records to publish.
Constant Summary
Constants inherited from Base
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#create_poll_info ⇒ Object
:nodoc:.
- #fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation
- #last_updated(record) ⇒ ActiveSupport::TimeWithZone
- #process_and_touch_info(batch, status) ⇒ Object
-
#process_updates ⇒ void
Send messages for updated data.
- #touch_info(batch) ⇒ void
Methods inherited from Base
#handle_message_too_large, #initialize, #log_identifier, #process_batch, #process_batch_with_span, #producer_classes, producers, #retrieve_poll_info, #should_run?, #start, #stop, #validate_producer_class
Constructor Details
This class inherits a constructor from Deimos::Utils::DbPoller::Base
Instance Method Details
#create_poll_info ⇒ Object
:nodoc:
12 13 14 15 16 17 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 12 def create_poll_info new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now Deimos::PollInfo.create!(producer: @resource_class.to_s, last_sent: new_time, last_sent_id: 0) end |
#fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation
56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 56 def fetch_results(time_from, time_to) id = self.producer_classes.first.config[:record_class].primary_key = ActiveRecord::Base.connection.quote_column_name(@config.) quoted_id = ActiveRecord::Base.connection.quote_column_name(id) @resource_class.poll_query(time_from: time_from, time_to: time_to, column_name: @config., min_id: @info.last_sent_id). limit(BATCH_SIZE). order("#{}, #{quoted_id}") end |
#last_updated(record) ⇒ ActiveSupport::TimeWithZone
70 71 72 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 70 def last_updated(record) record.public_send(@config.) end |
#process_and_touch_info(batch, status) ⇒ Object
21 22 23 24 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 21 def process_and_touch_info(batch, status) process_batch_with_span(batch, status) self.touch_info(batch) end |
#process_updates ⇒ void
This method returns an undefined value.
Send messages for updated data.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 28 def process_updates time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone time_to = Time.zone.now - @config.delay_time Deimos.config.logger.info("Polling #{log_identifier} from #{time_from} to #{time_to}") status = PollStatus.new(0, 0, 0) first_batch = true # poll_query gets all the relevant data from the database, as defined # by the producer itself. loop do Deimos.config.logger.debug("Polling #{log_identifier}, batch #{status.current_batch}") batch = fetch_results(time_from, time_to).to_a break if batch.empty? first_batch = false process_and_touch_info(batch, status) time_from = last_updated(batch.last) end # If there were no results at all, we update last_sent so that we still get a wait # before the next poll. @info.touch(:last_sent) if first_batch Deimos.config.logger.info("Poll #{log_identifier} complete at #{time_to} (#{status.report})") end |
#touch_info(batch) ⇒ void
This method returns an undefined value.
76 77 78 79 80 81 82 83 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 76 def touch_info(batch) record = batch.last id_method = record.class.primary_key last_id = record.public_send(id_method) last_updated_at = last_updated(record) @info.attributes = { last_sent: last_updated_at, last_sent_id: last_id } @info.save! end |