Module: Deimos::KafkaSource
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/deimos/kafka_source.rb
Overview
Represents an object which needs to inform Kafka when it is saved or bulk imported.
Defined Under Namespace
Modules: ClassMethods
Constant Summary collapse
- DEPRECATION_WARNING =
'The kafka_producer interface will be deprecated ' \ 'in future releases. Please use kafka_producers instead.'
Instance Method Summary collapse
-
#deletion_payload ⇒ Hash
Payload to send after we are destroyed.
-
#send_kafka_event_on_create ⇒ void
Send the newly created model to Kafka.
-
#send_kafka_event_on_destroy ⇒ void
Send a deletion (null payload) event to Kafka.
-
#send_kafka_event_on_update ⇒ void
Send the newly updated model to Kafka.
Instance Method Details
#deletion_payload ⇒ Hash
Payload to send after we are destroyed.
56 57 58 |
# File 'lib/deimos/kafka_source.rb', line 56 def deletion_payload { payload_key: self[self.class.primary_key] } end |
#send_kafka_event_on_create ⇒ void
This method returns an undefined value.
Send the newly created model to Kafka.
21 22 23 24 25 26 |
# File 'lib/deimos/kafka_source.rb', line 21 def send_kafka_event_on_create return unless self.persisted? return unless self.class.kafka_config[:create] self.class.kafka_producers.each { |p| p.send_event(self) } end |
#send_kafka_event_on_destroy ⇒ void
This method returns an undefined value.
Send a deletion (null payload) event to Kafka.
48 49 50 51 52 |
# File 'lib/deimos/kafka_source.rb', line 48 def send_kafka_event_on_destroy return unless self.class.kafka_config[:delete] self.class.kafka_producers.each { |p| p.publish_list([self.deletion_payload]) } end |
#send_kafka_event_on_update ⇒ void
This method returns an undefined value.
Send the newly updated model to Kafka.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/deimos/kafka_source.rb', line 30 def send_kafka_event_on_update return unless self.class.kafka_config[:update] producers = self.class.kafka_producers fields = producers.flat_map(&:watched_attributes).uniq fields -= ['updated_at'] # Only send an event if a field we care about was changed. any_changes = fields.any? do |field| field_change = self.previous_changes[field] field_change.present? && field_change[0] != field_change[1] end return unless any_changes producers.each { |p| p.send_event(self) } end |