Class: Deimos::Producer

Inherits:
Object
  • Object
show all
Includes:
SharedConfig
Defined in:
lib/deimos/producer.rb

Overview

Producer to publish messages to a given kafka topic.

Direct Known Subclasses

ActiveRecordProducer

Constant Summary collapse

MAX_BATCH_SIZE =

Returns:

  • (Integer)
500

Class Method Summary collapse

Class Method Details

.configHash

Returns:

  • (Hash)


68
69
70
71
72
73
# File 'lib/deimos/producer.rb', line 68

def config
  @config ||= {
    encode_key: true,
    namespace: Deimos.config.producers.schema_namespace
  }
end

.determine_backend_class(sync, force_send) ⇒ Class<Deimos::Backends::Base>

Parameters:

  • sync (Boolean)
  • force_send (Boolean)

Returns:



138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/deimos/producer.rb', line 138

def determine_backend_class(sync, force_send)
  backend = if force_send
              :kafka
            else
              Deimos.config.producers.backend
            end
  if backend == :kafka_async && sync
    backend = :kafka
  elsif backend == :kafka && sync == false
    backend = :kafka_async
  end
  "Deimos::Backends::#{backend.to_s.classify}".constantize
end

.encoderDeimos::SchemaBackends::Base



161
162
163
164
# File 'lib/deimos/producer.rb', line 161

def encoder
  @encoder ||= Deimos.schema_backend(schema: config[:schema],
                                     namespace: config[:namespace])
end

.key_encoderDeimos::SchemaBackends::Base



167
168
169
170
# File 'lib/deimos/producer.rb', line 167

def key_encoder
  @key_encoder ||= Deimos.schema_backend(schema: config[:key_schema],
                                         namespace: config[:namespace])
end

.partition_key(_payload) ⇒ String

Override the default partition key (which is the payload key). Will include ‘payload_key` if it is part of the original payload.

Parameters:

  • _payload (Hash)

    the payload being passed into the produce method.

Returns:

  • (String)


91
92
93
# File 'lib/deimos/producer.rb', line 91

def partition_key(_payload)
  nil
end

.produce_batch(backend, batch) ⇒ void

This method returns an undefined value.

Send a batch to the backend.

Parameters:



156
157
158
# File 'lib/deimos/producer.rb', line 156

def produce_batch(backend, batch)
  backend.publish(producer_class: self, messages: batch)
end

.publish(payload, topic: self.topic, headers: nil) ⇒ void

This method returns an undefined value.

Publish the payload to the topic.

Parameters:

  • payload (Hash, SchemaClass::Record)

    with an optional payload_key hash key.

  • topic (String) (defaults to: self.topic)

    if specifying the topic

  • headers (Hash) (defaults to: nil)

    if specifying headers



100
101
102
# File 'lib/deimos/producer.rb', line 100

def publish(payload, topic: self.topic, headers: nil)
  publish_list([payload], topic: topic, headers: headers)
end

.publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) ⇒ void

This method returns an undefined value.

Publish a list of messages. whether to publish synchronously. and send immediately to Kafka.

Parameters:

  • payloads (Array<Hash, SchemaClass::Record>)

    with optional payload_key hash key.

  • sync (Boolean) (defaults to: nil)

    if given, override the default setting of

  • force_send (Boolean) (defaults to: false)

    if true, ignore the configured backend

  • topic (String) (defaults to: self.topic)

    if specifying the topic

  • headers (Hash) (defaults to: nil)

    if specifying headers



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/deimos/producer.rb', line 113

def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil)
  return if Deimos.config.kafka.seed_brokers.blank? ||
            Deimos.config.producers.disabled ||
            Deimos.producers_disabled?(self)

  raise 'Topic not specified. Please specify the topic.' if topic.blank?

  backend_class = determine_backend_class(sync, force_send)
  Deimos.instrument(
    'encode_messages',
    producer: self,
    topic: topic,
    payloads: payloads
  ) do
    messages = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self, headers: headers) }
    messages.each { |m| _process_message(m, topic) }
    messages.in_groups_of(MAX_BATCH_SIZE, false) do |batch|
      self.produce_batch(backend_class, batch)
    end
  end
end

.topic(topic = nil) ⇒ String

Set the topic.

Parameters:

  • topic (String) (defaults to: nil)

Returns:

  • (String)

    the current topic if no argument given.



78
79
80
81
82
83
84
85
# File 'lib/deimos/producer.rb', line 78

def topic(topic=nil)
  if topic
    config[:topic] = topic
    return
  end
  # accessor
  "#{Deimos.config.producers.topic_prefix}#{config[:topic]}"
end

.watched_attributesArray<String>

Override this in active record producers to add non-schema fields to check for updates

Returns:

  • (Array<String>)

    fields to check for updates



175
176
177
# File 'lib/deimos/producer.rb', line 175

def watched_attributes
  self.encoder.schema_fields.map(&:name)
end