Class: Deimos::ActiveRecordProducer

Inherits:
Producer
  • Object
show all
Defined in:
lib/deimos/active_record_producer.rb

Overview

Class which automatically produces a record when given an ActiveRecord instance or a list of them. Just call ‘send_events` on a list of records and they will be auto-published. You can override `generate_payload` to make changes to the payload before it’s published.

You can also call this with a list of hashes representing attributes. This is common when using activerecord-import.

Constant Summary

Constants inherited from Producer

Producer::MAX_BATCH_SIZE

Class Method Summary collapse

Methods inherited from Producer

config, determine_backend_class, encoder, key_encoder, partition_key, produce_batch, publish, publish_list, topic, watched_attributes

Class Method Details

.generate_payload(attributes, _record) ⇒ Hash

Generate the payload, given a list of attributes or a record.. Can be overridden or added to by subclasses. is not set.

Parameters:

  • attributes (Hash)
  • _record (ActiveRecord::Base)

    May be nil if refetch_record

Returns:

  • (Hash)


59
60
61
62
63
64
65
66
67
68
# File 'lib/deimos/active_record_producer.rb', line 59

def generate_payload(attributes, _record)
  fields = self.encoder.schema_fields
  payload = attributes.stringify_keys
  payload.delete_if do |k, _|
    k.to_sym != :payload_key && !fields.map(&:name).include?(k)
  end
  return payload unless Utils::SchemaClass.use?(config.to_h)

  Utils::SchemaClass.instance(payload, config[:schema], config[:namespace])
end

.poll_query(time_from:, time_to:, column_name: :updated_at, min_id:) ⇒ ActiveRecord::Relation

Query to use when polling the database with the DbPoller. Add includes, joins, or wheres as necessary, or replace entirely. than this value).

Parameters:

  • time_from (Time)

    the time to start the query from.

  • time_to (Time)

    the time to end the query.

  • column_name (Symbol) (defaults to: :updated_at)

    the column name to look for.

  • min_id (Numeric)

    the minimum ID (i.e. all IDs must be greater

Returns:

  • (ActiveRecord::Relation)


78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/deimos/active_record_producer.rb', line 78

def poll_query(time_from:, time_to:, column_name: :updated_at, min_id:)
  klass = config[:record_class]
  table = ActiveRecord::Base.connection.quote_table_name(klass.table_name)
  column = ActiveRecord::Base.connection.quote_column_name(column_name)
  primary = ActiveRecord::Base.connection.quote_column_name(klass.primary_key)
  klass.where(
    "((#{table}.#{column} = ? AND #{table}.#{primary} > ?) \
     OR #{table}.#{column} > ?) AND #{table}.#{column} <= ?",
    time_from,
    min_id,
    time_from,
    time_to
  )
end

.post_process(_records) ⇒ Object

Post process records after publishing

Parameters:

  • records (Array<ActiveRecord::Base>)


95
96
# File 'lib/deimos/active_record_producer.rb', line 95

def post_process(_records)
end

.record_class(klass, refetch: true) ⇒ void

This method returns an undefined value.

Indicate the class this producer is working on. a record object, refetch the record to pass into the ‘generate_payload` method.

Parameters:

  • klass (Class)
  • refetch (Boolean) (defaults to: true)

    if true, and we are given a hash instead of



21
22
23
24
# File 'lib/deimos/active_record_producer.rb', line 21

def record_class(klass, refetch: true)
  config[:record_class] = klass
  config[:refetch_record] = refetch
end

.send_event(record, force_send: false) ⇒ void

This method returns an undefined value.

Parameters:

  • record (ActiveRecord::Base)
  • force_send (Boolean) (defaults to: false)


29
30
31
# File 'lib/deimos/active_record_producer.rb', line 29

def send_event(record, force_send: false)
  send_events([record], force_send: force_send)
end

.send_events(records, force_send: false) ⇒ void

This method returns an undefined value.

Parameters:

  • records (Array<ActiveRecord::Base>)
  • force_send (Boolean) (defaults to: false)


36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/deimos/active_record_producer.rb', line 36

def send_events(records, force_send: false)
  primary_key = config[:record_class]&.primary_key
  messages = records.map do |record|
    if record.respond_to?(:attributes)
      attrs = record.attributes.with_indifferent_access
    else
      attrs = record.with_indifferent_access
      if config[:refetch_record] && attrs[primary_key]
        record = config[:record_class].find(attrs[primary_key])
      end
    end
    generate_payload(attrs, record).with_indifferent_access
  end
  self.publish_list(messages, force_send: force_send)
  self.post_process(records)
end