Class: Deimos::Utils::DbPoller::StateBased

Inherits:
Base
  • Object
show all
Defined in:
lib/deimos/utils/db_poller/state_based.rb

Overview

Poller that uses state columns to determine the records to publish.

Constant Summary

Constants inherited from Base

Base::BATCH_SIZE

Instance Attribute Summary

Attributes inherited from Base

#config, #id

Instance Method Summary collapse

Methods inherited from Base

#create_poll_info, #initialize, #process_batch, #process_batch_with_span, #retrieve_poll_info, #should_run?, #start, #stop

Constructor Details

This class inherits a constructor from Deimos::Utils::DbPoller::Base

Instance Method Details

#fetch_resultsActiveRecord::Relation

Returns:

  • (ActiveRecord::Relation)


33
34
35
# File 'lib/deimos/utils/db_poller/state_based.rb', line 33

def fetch_results
  @producer.poll_query.limit(BATCH_SIZE).order(@config.timestamp_column)
end

#finalize_batch(batch, success) ⇒ void

This method returns an undefined value.

Parameters:

  • batch (Array<ActiveRecord::Base>)
  • success (Boolean)


40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/deimos/utils/db_poller/state_based.rb', line 40

def finalize_batch(batch, success)
  @info.touch(:last_sent)

  state = success ? @config.published_state : @config.failed_state
  klass = batch.first.class
  id_col = klass.primary_key.to_sym
  timestamp_col = @config.timestamp_column

  attrs = { timestamp_col => Time.zone.now }
  attrs[@config.state_column] = state if state
  if @config.publish_timestamp_column
    attrs[@config.publish_timestamp_column] = Time.zone.now
  end

  klass.where(id_col => batch.map(&id_col)).update_all(attrs)
end

#process_updatesvoid

This method returns an undefined value.

Send messages for updated data.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/deimos/utils/db_poller/state_based.rb', line 12

def process_updates
  Deimos.config.logger.info("Polling #{@producer.topic}")
  status = PollStatus.new(0, 0, 0)

  # poll_query gets all the relevant data from the database, as defined
  # by the producer itself.
  loop do
    Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{status.current_batch}")
    batch = fetch_results.to_a
    if batch.empty?
      @info.touch(:last_sent)
      break
    end

    success = process_batch_with_span(batch, status)
    finalize_batch(batch, success)
  end
  Deimos.config.logger.info("Poll #{@producer.topic} complete (#{status.report}")
end