Class: ActionMailerKafka::BaseProducer
- Inherits:
-
Object
- Object
- ActionMailerKafka::BaseProducer
- Defined in:
- lib/action_mailer_kafka/base_producer.rb
Constant Summary collapse
- DELIVERY_INTERVAL =
trigger a delivery half a min
30
- BUFFER_SIZE =
trigger a delivery when buffered 20 emails
20
- MAX_RETRIES =
2
- RETRY_BACKOFF =
5
Instance Method Summary collapse
-
#initialize(kafka_client_info:, transactional_id: Socket.gethostname, logger: nil) ⇒ BaseProducer
constructor
A new instance of BaseProducer.
- #publish(data, message_key, topic) ⇒ Object
Constructor Details
#initialize(kafka_client_info:, transactional_id: Socket.gethostname, logger: nil) ⇒ BaseProducer
Returns a new instance of BaseProducer.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/action_mailer_kafka/base_producer.rb', line 8 def initialize( kafka_client_info:, transactional_id: Socket.gethostname, logger: nil ) @logger = logger kafka_client = ::Kafka.new(kafka_client_info) @kafka_async_producer = kafka_client.async_producer( delivery_threshold: BUFFER_SIZE, delivery_interval: DELIVERY_INTERVAL, max_retries: MAX_RETRIES, retry_backoff: RETRY_BACKOFF, idempotent: true, required_acks: :all, transactional_id: transactional_id ) end |
Instance Method Details
#publish(data, message_key, topic) ⇒ Object
26 27 28 29 30 31 |
# File 'lib/action_mailer_kafka/base_producer.rb', line 26 def publish(data, , topic) @kafka_async_producer.produce(data, key: , topic: topic) @kafka_async_producer. rescue Kafka::DeliveryFailed => e @logger&.error("Fail to deliver some kafka messages: #{e}") end |