Class: Deimos::ActiveRecordProducer

Inherits:
Producer
  • Object
show all
Defined in:
lib/deimos/active_record_producer.rb

Overview

Class which automatically produces a record when given an ActiveRecord instance or a list of them. Just call send_events on a list of records and they will be auto-published. You can override generate_payload to make changes to the payload before it’s published.

You can also call this with a list of hashes representing attributes. This is common when using activerecord-import.

Constant Summary

Constants inherited from Producer

Producer::MAX_BATCH_SIZE

Class Method Summary collapse

Methods inherited from Producer

determine_backend_class, karafka_config, partition_key, produce, produce_batch, publish, publish_list, topic

Class Method Details

.configObject



57
58
59
# File 'lib/deimos/active_record_producer.rb', line 57

def config
  Deimos.karafka_configs.find { |t| t.producer_classes.include?(self) }
end

.encoderObject



61
62
63
64
# File 'lib/deimos/active_record_producer.rb', line 61

def encoder
  raise "No schema or namespace configured for #{self.name}" if config.nil?
  config.deserializers[:payload].backend
end

.generate_payload(attributes, _record) ⇒ Hash

Generate the payload, given a list of attributes or a record.. Can be overridden or added to by subclasses. is not set.



72
73
74
75
76
77
78
79
80
81
# File 'lib/deimos/active_record_producer.rb', line 72

def generate_payload(attributes, _record)
  fields = self.encoder.schema_fields
  payload = attributes.stringify_keys
  payload.delete_if do |k, _|
    k.to_sym != :payload_key && !fields.map(&:name).include?(k)
  end
  return payload unless self.config.use_schema_classes

  Utils::SchemaClass.instance(payload, encoder.schema, encoder.namespace)
end

.poll_query(time_from:, time_to:, column_name: :updated_at, min_id:) ⇒ ActiveRecord::Relation

Query to use when polling the database with the DbPoller. Add includes, joins, or wheres as necessary, or replace entirely. than this value).



91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/deimos/active_record_producer.rb', line 91

def poll_query(time_from:, time_to:, column_name: :updated_at, min_id:)
  klass = @record_class
  table = ActiveRecord::Base.connection.quote_table_name(klass.table_name)
  column = ActiveRecord::Base.connection.quote_column_name(column_name)
  primary = ActiveRecord::Base.connection.quote_column_name(klass.primary_key)
  klass.where(
    "((#{table}.#{column} = ? AND #{table}.#{primary} > ?) \
     OR #{table}.#{column} > ?) AND #{table}.#{column} <= ?",
    time_from,
    min_id,
    time_from,
    time_to
  )
end

.post_process(_records) ⇒ Object

Post process records after publishing



108
109
# File 'lib/deimos/active_record_producer.rb', line 108

def post_process(_records)
end

.record_class(klass = nil, refetch: true) ⇒ void

This method returns an undefined value.

Indicate the class this producer is working on. a record object, refetch the record to pass into the generate_payload method.



21
22
23
24
25
26
# File 'lib/deimos/active_record_producer.rb', line 21

def record_class(klass=nil, refetch: true)
  return @record_class if klass.nil?

  @record_class = klass
  @refetch_record = refetch
end

.send_event(record, force_send: false) ⇒ void



31
32
33
# File 'lib/deimos/active_record_producer.rb', line 31

def send_event(record, force_send: false)
  send_events([record], force_send: force_send)
end

.send_events(records, force_send: false) ⇒ void



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/deimos/active_record_producer.rb', line 38

def send_events(records, force_send: false)
  return if Deimos.producers_disabled?(self)

  primary_key = @record_class&.primary_key
  messages = records.map do |record|
    if record.respond_to?(:attributes)
      attrs = record.attributes.with_indifferent_access
    else
      attrs = record.with_indifferent_access
      if @refetch_record && attrs[primary_key]
        record = @record_class.find(attrs[primary_key])
      end
    end
    generate_payload(attrs, record).with_indifferent_access
  end
  self.publish_list(messages, force_send: force_send)
  self.post_process(records)
end

.watched_attributes(_record) ⇒ Array<String>

Override this in active record producers to add non-schema fields to check for updates



115
116
117
# File 'lib/deimos/active_record_producer.rb', line 115

def watched_attributes(_record)
  self.encoder.schema_fields.map(&:name)
end