Class: Deimos::Utils::DbPoller::StateBased

Inherits:
Base
  • Object
show all
Defined in:
lib/deimos/utils/db_poller/state_based.rb

Overview

Poller that uses state columns to determine the records to publish.

Constant Summary

Constants inherited from Base

Base::BATCH_SIZE

Instance Attribute Summary

Attributes inherited from Base

#config, #id

Instance Method Summary collapse

Methods inherited from Base

#create_poll_info, #handle_message_too_large, #initialize, #log_identifier, #process_batch, #process_batch_with_span, #producer_classes, producers, #retrieve_poll_info, #should_run?, #start, #stop, #validate_producer_class

Constructor Details

This class inherits a constructor from Deimos::Utils::DbPoller::Base

Instance Method Details

#fetch_resultsActiveRecord::Relation

Returns:

  • (ActiveRecord::Relation)


36
37
38
# File 'lib/deimos/utils/db_poller/state_based.rb', line 36

def fetch_results
  @resource_class.poll_query.limit(BATCH_SIZE).order(@config.timestamp_column)
end

#finalize_batch(batch, success) ⇒ void

This method returns an undefined value.

Parameters:

  • batch (Array<ActiveRecord::Base>)
  • success (Boolean)


43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/deimos/utils/db_poller/state_based.rb', line 43

def finalize_batch(batch, success)
  @info.touch(:last_sent)

  state = success ? @config.published_state : @config.failed_state
  klass = batch.first.class
  id_col = klass.primary_key.to_sym
  timestamp_col = @config.timestamp_column

  attrs = { timestamp_col => Time.zone.now }
  attrs[@config.state_column] = state if state
  if @config.publish_timestamp_column
    attrs[@config.publish_timestamp_column] = Time.zone.now
  end

  klass.where(id_col => batch.map(&id_col)).update_all(attrs)
end

#process_updatesvoid

This method returns an undefined value.

Send messages for updated data.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/deimos/utils/db_poller/state_based.rb', line 12

def process_updates
  Deimos.config.logger.info("Polling #{log_identifier}")
  status = PollStatus.new(0, 0, 0)
  first_batch = true

  # poll_query gets all the relevant data from the database, as defined
  # by the producer itself.
  loop do
    Deimos.config.logger.debug("Polling #{log_identifier}, batch #{status.current_batch}")
    batch = fetch_results.to_a
    break if batch.empty?

    first_batch = false
    success = process_batch_with_span(batch, status)
    finalize_batch(batch, success)
  end

  # If there were no results at all, we update last_sent so that we still get a wait
  # before the next poll.
  @info.touch(:last_sent) if first_batch
  Deimos.config.logger.info("Poll #{log_identifier} complete (#{status.report}")
end