Class: Deimos::Utils::DbPoller::TimeBased

Inherits:
Base
  • Object
show all
Defined in:
lib/deimos/utils/db_poller/time_based.rb

Overview

Poller that uses ID and updated_at to determine the records to publish.

Constant Summary

Constants inherited from Base

Base::BATCH_SIZE

Instance Attribute Summary

Attributes inherited from Base

#config, #id

Instance Method Summary collapse

Methods inherited from Base

#handle_message_too_large, #initialize, #log_identifier, #process_batch, #process_batch_with_span, #producer_classes, producers, #retrieve_poll_info, #should_run?, #start, #stop, #validate_producer_class

Constructor Details

This class inherits a constructor from Deimos::Utils::DbPoller::Base

Instance Method Details

#create_poll_infoObject

:nodoc:



12
13
14
15
16
17
# File 'lib/deimos/utils/db_poller/time_based.rb', line 12

def create_poll_info
  new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now
  Deimos::PollInfo.create!(producer: @resource_class.to_s,
                           last_sent: new_time,
                           last_sent_id: 0)
end

#fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation

Parameters:

  • time_from (ActiveSupport::TimeWithZone)
  • time_to (ActiveSupport::TimeWithZone)

Returns:

  • (ActiveRecord::Relation)


56
57
58
59
60
61
62
63
64
65
66
# File 'lib/deimos/utils/db_poller/time_based.rb', line 56

def fetch_results(time_from, time_to)
  id = self.producer_classes.first.config[:record_class].primary_key
  quoted_timestamp = ActiveRecord::Base.connection.quote_column_name(@config.timestamp_column)
  quoted_id = ActiveRecord::Base.connection.quote_column_name(id)
  @resource_class.poll_query(time_from: time_from,
                             time_to: time_to,
                             column_name: @config.timestamp_column,
                             min_id: @info.last_sent_id).
    limit(BATCH_SIZE).
    order("#{quoted_timestamp}, #{quoted_id}")
end

#last_updated(record) ⇒ ActiveSupport::TimeWithZone

Parameters:

  • record (ActiveRecord::Base)

Returns:

  • (ActiveSupport::TimeWithZone)


70
71
72
# File 'lib/deimos/utils/db_poller/time_based.rb', line 70

def last_updated(record)
  record.public_send(@config.timestamp_column)
end

#process_and_touch_info(batch, status) ⇒ Object

Parameters:



21
22
23
24
# File 'lib/deimos/utils/db_poller/time_based.rb', line 21

def process_and_touch_info(batch, status)
  process_batch_with_span(batch, status)
  self.touch_info(batch)
end

#process_updatesvoid

This method returns an undefined value.

Send messages for updated data.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/deimos/utils/db_poller/time_based.rb', line 28

def process_updates
  time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone
  time_to = Time.zone.now - @config.delay_time
  Deimos.config.logger.info("Polling #{log_identifier} from #{time_from} to #{time_to}")
  status = PollStatus.new(0, 0, 0)
  first_batch = true

  # poll_query gets all the relevant data from the database, as defined
  # by the producer itself.
  loop do
    Deimos.config.logger.debug("Polling #{log_identifier}, batch #{status.current_batch}")
    batch = fetch_results(time_from, time_to).to_a
    break if batch.empty?

    first_batch = false
    process_and_touch_info(batch, status)
    time_from = last_updated(batch.last)
  end

  # If there were no results at all, we update last_sent so that we still get a wait
  # before the next poll.
  @info.touch(:last_sent) if first_batch
  Deimos.config.logger.info("Poll #{log_identifier} complete at #{time_to} (#{status.report})")
end

#touch_info(batch) ⇒ void

This method returns an undefined value.

Parameters:

  • batch (Array<ActiveRecord::Base>)


76
77
78
79
80
81
82
83
# File 'lib/deimos/utils/db_poller/time_based.rb', line 76

def touch_info(batch)
  record = batch.last
  id_method = record.class.primary_key
  last_id = record.public_send(id_method)
  last_updated_at = last_updated(record)
  @info.attributes = { last_sent: last_updated_at, last_sent_id: last_id }
  @info.save!
end