Class: Sbmt::KafkaProducer::BaseProducer
- Inherits:
-
Object
- Object
- Sbmt::KafkaProducer::BaseProducer
- Extended by:
- Dry::Initializer
- Defined in:
- lib/sbmt/kafka_producer/base_producer.rb
Direct Known Subclasses
Constant Summary collapse
- MSG_SUCCESS =
"Message has been successfully sent to Kafka"
Instance Method Summary collapse
- #async_publish(payload, options = {}) ⇒ Object
- #async_publish!(payload, options = {}) ⇒ Object
- #sync_publish(payload, options = {}) ⇒ Object
- #sync_publish!(payload, options = {}) ⇒ Object
Instance Method Details
#async_publish(payload, options = {}) ⇒ Object
38 39 40 41 42 43 44 |
# File 'lib/sbmt/kafka_producer/base_producer.rb', line 38 def async_publish(payload, = {}) async_publish!(payload, ) true rescue WaterDrop::Errors::ProduceError => e log_error(e) false end |
#async_publish!(payload, options = {}) ⇒ Object
31 32 33 34 35 36 |
# File 'lib/sbmt/kafka_producer/base_producer.rb', line 31 def async_publish!(payload, = {}) around_publish do client.produce_async(payload: payload, **.merge(topic: topic)) end true end |
#sync_publish(payload, options = {}) ⇒ Object
23 24 25 26 27 28 29 |
# File 'lib/sbmt/kafka_producer/base_producer.rb', line 23 def sync_publish(payload, = {}) sync_publish!(payload, ) true rescue WaterDrop::Errors::ProduceError => e log_error(e) false end |
#sync_publish!(payload, options = {}) ⇒ Object
13 14 15 16 17 18 19 20 21 |
# File 'lib/sbmt/kafka_producer/base_producer.rb', line 13 def sync_publish!(payload, = {}) report, produce_duration = around_publish do measure_time do client.produce_sync(payload: payload, **.merge(topic: topic)) end end log_success(report, produce_duration) true end |