Class: Racecar::Producer
- Inherits:
-
Object
- Object
- Racecar::Producer
- Defined in:
- lib/racecar/producer.rb
Constant Summary collapse
- @@mutex =
Mutex.new
Class Method Summary collapse
Instance Method Summary collapse
- #init_internal_producer(config) ⇒ Object
-
#initialize(config: nil, logger: nil, instrumenter: NullInstrumenter) ⇒ Producer
constructor
A new instance of Producer.
-
#produce_async(value:, topic:, **options) ⇒ Object
fire and forget - you won’t get any guarantees or feedback from Racecar on the status of the message and it won’t halt execution of the rest of your code.
-
#produce_sync(value:, topic:, **options) ⇒ Object
synchronous message production - will wait until the delivery handle succeeds, fails or times out.
-
#wait_for_delivery ⇒ Object
Blocks until all messages that have been asynchronously produced in the block have been delivered.
Constructor Details
#initialize(config: nil, logger: nil, instrumenter: NullInstrumenter) ⇒ Producer
Returns a new instance of Producer.
25 26 27 28 29 30 31 32 |
# File 'lib/racecar/producer.rb', line 25 def initialize(config: nil, logger: nil, instrumenter: NullInstrumenter) @config = config @logger = logger @delivery_handles = [] @instrumenter = instrumenter @batching = false @internal_producer = init_internal_producer(config) end |
Class Method Details
.shutdown! ⇒ Object
16 17 18 19 20 21 22 |
# File 'lib/racecar/producer.rb', line 16 def shutdown! @@mutex.synchronize do if !@internal_producer.nil? @internal_producer.close end end end |
Instance Method Details
#init_internal_producer(config) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/racecar/producer.rb', line 34 def init_internal_producer(config) @@mutex.synchronize do @@init_internal_producer ||= begin # https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md producer_config = { "bootstrap.servers" => config.brokers.join(","), "client.id" => config.client_id, "statistics.interval.ms" => config.statistics_interval_ms, "message.timeout.ms" => config. * 1000, } producer_config["compression.codec"] = config.producer_compression_codec.to_s unless config.producer_compression_codec.nil? producer_config.merge!(config.rdkafka_producer) Rdkafka::Config.new(producer_config).producer.tap do |producer| producer.delivery_callback = DeliveryCallback.new(instrumenter: @instrumenter) end end end end |
#produce_async(value:, topic:, **options) ⇒ Object
fire and forget - you won’t get any guarantees or feedback from Racecar on the status of the message and it won’t halt execution of the rest of your code.
56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/racecar/producer.rb', line 56 def produce_async(value:, topic:, **) with_instrumentation(action: "produce_async", value: value, topic: topic, **) do begin handle = internal_producer.produce(payload: value, topic: topic, **) @delivery_handles << handle if @batching rescue Rdkafka::RdkafkaError => e raise MessageDeliveryError.new(e, handle) end end nil end |
#produce_sync(value:, topic:, **options) ⇒ Object
synchronous message production - will wait until the delivery handle succeeds, fails or times out.
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/racecar/producer.rb', line 70 def produce_sync(value:, topic:, **) with_instrumentation(action: "produce_sync", value: value, topic: topic, **) do begin handle = internal_producer.produce(payload: value, topic: topic, **) deliver_with_error_handling(handle) rescue Rdkafka::RdkafkaError => e raise MessageDeliveryError.new(e, handle) end end nil end |
#wait_for_delivery ⇒ Object
Blocks until all messages that have been asynchronously produced in the block have been delivered. Usage: messages = [
{value: "message1", topic: "topic1"},
{value: "message2", topic: "topic1"},
{value: "message3", topic: "topic2"}
]
Racecar.wait_for_delivery
.each do |msg|
Racecar.produce_async(value: msg[:value], topic: msg[:topic])
end
95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/racecar/producer.rb', line 95 def wait_for_delivery @batching = true @delivery_handles.clear yield @delivery_handles.each do |handle| deliver_with_error_handling(handle) end ensure @delivery_handles.clear @batching = false nil end |