Class: Deimos::Producer
- Inherits:
-
Object
- Object
- Deimos::Producer
- Includes:
- SharedConfig
- Defined in:
- lib/deimos/producer.rb
Overview
Producer to publish messages to a given kafka topic.
Direct Known Subclasses
Constant Summary collapse
- MAX_BATCH_SIZE =
500
Class Method Summary collapse
- .config ⇒ Hash
- .determine_backend_class(sync, force_send) ⇒ Class<Deimos::Backends::Base>
- .encoder ⇒ Deimos::SchemaBackends::Base
- .key_encoder ⇒ Deimos::SchemaBackends::Base
-
.partition_key(_payload) ⇒ String
Override the default partition key (which is the payload key).
-
.produce_batch(backend, batch) ⇒ void
Send a batch to the backend.
-
.publish(payload, topic: self.topic, headers: nil) ⇒ void
Publish the payload to the topic.
-
.publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) ⇒ void
Publish a list of messages.
-
.topic(topic = nil) ⇒ String
Set the topic.
-
.watched_attributes ⇒ Array<String>
Override this in active record producers to add non-schema fields to check for updates.
Class Method Details
.config ⇒ Hash
68 69 70 71 72 73 |
# File 'lib/deimos/producer.rb', line 68 def config @config ||= { encode_key: true, namespace: Deimos.config.producers.schema_namespace } end |
.determine_backend_class(sync, force_send) ⇒ Class<Deimos::Backends::Base>
138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/deimos/producer.rb', line 138 def determine_backend_class(sync, force_send) backend = if force_send :kafka else Deimos.config.producers.backend end if backend == :kafka_async && sync backend = :kafka elsif backend == :kafka && sync == false backend = :kafka_async end "Deimos::Backends::#{backend.to_s.classify}".constantize end |
.encoder ⇒ Deimos::SchemaBackends::Base
161 162 163 164 |
# File 'lib/deimos/producer.rb', line 161 def encoder @encoder ||= Deimos.schema_backend(schema: config[:schema], namespace: config[:namespace]) end |
.key_encoder ⇒ Deimos::SchemaBackends::Base
167 168 169 170 |
# File 'lib/deimos/producer.rb', line 167 def key_encoder @key_encoder ||= Deimos.schema_backend(schema: config[:key_schema], namespace: config[:namespace]) end |
.partition_key(_payload) ⇒ String
Override the default partition key (which is the payload key). Will include ‘payload_key` if it is part of the original payload.
91 92 93 |
# File 'lib/deimos/producer.rb', line 91 def partition_key(_payload) nil end |
.produce_batch(backend, batch) ⇒ void
This method returns an undefined value.
Send a batch to the backend.
156 157 158 |
# File 'lib/deimos/producer.rb', line 156 def produce_batch(backend, batch) backend.publish(producer_class: self, messages: batch) end |
.publish(payload, topic: self.topic, headers: nil) ⇒ void
This method returns an undefined value.
Publish the payload to the topic.
100 101 102 |
# File 'lib/deimos/producer.rb', line 100 def publish(payload, topic: self.topic, headers: nil) publish_list([payload], topic: topic, headers: headers) end |
.publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) ⇒ void
This method returns an undefined value.
Publish a list of messages. whether to publish synchronously. and send immediately to Kafka.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/deimos/producer.rb', line 113 def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) return if Deimos.config.kafka.seed_brokers.blank? || Deimos.config.producers.disabled || Deimos.producers_disabled?(self) raise 'Topic not specified. Please specify the topic.' if topic.blank? backend_class = determine_backend_class(sync, force_send) Deimos.instrument( 'encode_messages', producer: self, topic: topic, payloads: payloads ) do = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self, headers: headers) } .each { |m| (m, topic) } .in_groups_of(MAX_BATCH_SIZE, false) do |batch| self.produce_batch(backend_class, batch) end end end |
.topic(topic = nil) ⇒ String
Set the topic.
78 79 80 81 82 83 84 85 |
# File 'lib/deimos/producer.rb', line 78 def topic(topic=nil) if topic config[:topic] = topic return end # accessor "#{Deimos.config.producers.topic_prefix}#{config[:topic]}" end |
.watched_attributes ⇒ Array<String>
Override this in active record producers to add non-schema fields to check for updates
175 176 177 |
# File 'lib/deimos/producer.rb', line 175 def watched_attributes self.encoder.schema_fields.map(&:name) end |