Class: ActionMailerKafka::BaseProducer

Inherits:
Object
  • Object
show all
Defined in:
lib/action_mailer_kafka/base_producer.rb

Constant Summary collapse

DELIVERY_INTERVAL =

trigger a delivery half a min

30
BUFFER_SIZE =

trigger a delivery when buffered 20 emails

20
MAX_RETRIES =
2
RETRY_BACKOFF =
5

Instance Method Summary collapse

Constructor Details

#initialize(kafka_client_info:, transactional_id: Socket.gethostname, logger: nil) ⇒ BaseProducer

Returns a new instance of BaseProducer.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/action_mailer_kafka/base_producer.rb', line 8

def initialize(
  kafka_client_info:,
  transactional_id: Socket.gethostname,
  logger: nil
)
  @logger = logger
  kafka_client = ::Kafka.new(kafka_client_info)
  @kafka_async_producer = kafka_client.async_producer(
    delivery_threshold: BUFFER_SIZE,
    delivery_interval: DELIVERY_INTERVAL,
    max_retries: MAX_RETRIES,
    retry_backoff: RETRY_BACKOFF,
    idempotent: true,
    required_acks: :all,
    transactional_id: transactional_id
  )
end

Instance Method Details

#publish(data, message_key, topic) ⇒ Object



26
27
28
29
30
31
# File 'lib/action_mailer_kafka/base_producer.rb', line 26

def publish(data, message_key, topic)
  @kafka_async_producer.produce(data, key: message_key, topic: topic)
  @kafka_async_producer.deliver_messages
rescue Kafka::DeliveryFailed => e
  @logger&.error("Fail to deliver some kafka messages: #{e}")
end