Class: Racecar::Producer

Inherits:
Object
  • Object
show all
Defined in:
lib/racecar/producer.rb

Constant Summary collapse

@@mutex =
Mutex.new

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config: nil, logger: nil, instrumenter: NullInstrumenter) ⇒ Producer

Returns a new instance of Producer.



25
26
27
28
29
30
31
32
# File 'lib/racecar/producer.rb', line 25

def initialize(config: nil, logger: nil, instrumenter: NullInstrumenter)
  @config = config
  @logger = logger
  @delivery_handles = []
  @instrumenter = instrumenter
  @batching = false
  @internal_producer = init_internal_producer(config)
end

Class Method Details

.shutdown!Object



16
17
18
19
20
21
22
# File 'lib/racecar/producer.rb', line 16

def shutdown!
  @@mutex.synchronize do
    if !@internal_producer.nil?
      @internal_producer.close
    end
  end
end

Instance Method Details

#init_internal_producer(config) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/racecar/producer.rb', line 34

def init_internal_producer(config)
  @@mutex.synchronize do
    @@init_internal_producer ||= begin
      # https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
      producer_config = {
        "bootstrap.servers"      => config.brokers.join(","),
        "client.id"              => config.client_id,
        "statistics.interval.ms" => config.statistics_interval_ms,
        "message.timeout.ms"     => config.message_timeout * 1000,
      }
      producer_config["compression.codec"] = config.producer_compression_codec.to_s unless config.producer_compression_codec.nil?
      producer_config.merge!(config.rdkafka_producer)
      Rdkafka::Config.new(producer_config).producer.tap do |producer|
        producer.delivery_callback = DeliveryCallback.new(instrumenter: @instrumenter)
      end
    end
  end
end

#produce_async(value:, topic:, **options) ⇒ Object

fire and forget - you won’t get any guarantees or feedback from Racecar on the status of the message and it won’t halt execution of the rest of your code.



56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/racecar/producer.rb', line 56

def produce_async(value:, topic:, **options)
  with_instrumentation(action: "produce_async", value: value, topic: topic, **options) do
    begin
      handle = internal_producer.produce(payload: value, topic: topic, **options)
      @delivery_handles << handle if @batching
    rescue Rdkafka::RdkafkaError => e
      raise MessageDeliveryError.new(e, handle)
    end
  end

  nil
end

#produce_sync(value:, topic:, **options) ⇒ Object

synchronous message production - will wait until the delivery handle succeeds, fails or times out.



70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/racecar/producer.rb', line 70

def produce_sync(value:, topic:, **options)
  with_instrumentation(action: "produce_sync", value: value, topic: topic, **options) do
    begin
      handle = internal_producer.produce(payload: value, topic: topic, **options)
      deliver_with_error_handling(handle)
    rescue Rdkafka::RdkafkaError => e
      raise MessageDeliveryError.new(e, handle)
    end
  end

  nil
end

#wait_for_deliveryObject

Blocks until all messages that have been asynchronously produced in the block have been delivered. Usage: messages = [

{value: "message1", topic: "topic1"},
{value: "message2", topic: "topic1"},
{value: "message3", topic: "topic2"}
]

Racecar.wait_for_delivery

messages.each do |msg|
  Racecar.produce_async(value: msg[:value], topic: msg[:topic])
end



95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/racecar/producer.rb', line 95

def wait_for_delivery
  @batching = true
  @delivery_handles.clear
  yield
  @delivery_handles.each do |handle|
    deliver_with_error_handling(handle)
  end
ensure
  @delivery_handles.clear
  @batching = false

  nil
end