Class: Sbmt::KafkaProducer::BaseProducer

Inherits:
Object
  • Object
show all
Extended by:
Dry::Initializer
Defined in:
lib/sbmt/kafka_producer/base_producer.rb

Direct Known Subclasses

OutboxProducer

Constant Summary collapse

MSG_SUCCESS =
"Message has been successfully sent to Kafka"

Instance Method Summary collapse

Instance Method Details

#async_publish(payload, options = {}) ⇒ Object



38
39
40
41
42
43
44
# File 'lib/sbmt/kafka_producer/base_producer.rb', line 38

def async_publish(payload, options = {})
  async_publish!(payload, options)
  true
rescue WaterDrop::Errors::ProduceError => e
  log_error(e)
  false
end

#async_publish!(payload, options = {}) ⇒ Object



31
32
33
34
35
36
# File 'lib/sbmt/kafka_producer/base_producer.rb', line 31

def async_publish!(payload, options = {})
  around_publish do
    client.produce_async(payload: payload, **options.merge(topic: topic))
  end
  true
end

#sync_publish(payload, options = {}) ⇒ Object



23
24
25
26
27
28
29
# File 'lib/sbmt/kafka_producer/base_producer.rb', line 23

def sync_publish(payload, options = {})
  sync_publish!(payload, options)
  true
rescue WaterDrop::Errors::ProduceError => e
  log_error(e)
  false
end

#sync_publish!(payload, options = {}) ⇒ Object



13
14
15
16
17
18
19
20
21
# File 'lib/sbmt/kafka_producer/base_producer.rb', line 13

def sync_publish!(payload, options = {})
  report, produce_duration = around_publish do
    measure_time do
      client.produce_sync(payload: payload, **options.merge(topic: topic))
    end
  end
  log_success(report, produce_duration)
  true
end