Class: Kafka::KafkaProducer

Inherits:
Object
  • Object
show all
Defined in:
lib/jruby-kafka/kafka-producer.rb

Overview

noinspection JRubyStringImportInspection

Defined Under Namespace

Classes: RubyCallback

Constant Summary collapse

KAFKA_PRODUCER =
Java::org.apache.kafka.clients.producer.KafkaProducer
REQUIRED =
[
  :bootstrap_servers, :key_serializer
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ KafkaProducer

Returns a new instance of KafkaProducer.



29
30
31
32
33
# File 'lib/jruby-kafka/kafka-producer.rb', line 29

def initialize(opts = {})
  Kafka::Utility.validate_arguments REQUIRED, opts
  @options = opts
  @send_method = @send_cb_method = proc { throw StandardError.new 'Producer is not connected' }
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



27
28
29
# File 'lib/jruby-kafka/kafka-producer.rb', line 27

def options
  @options
end

#producerObject (readonly)

Returns the value of attribute producer.



27
28
29
# File 'lib/jruby-kafka/kafka-producer.rb', line 27

def producer
  @producer
end

#send_cb_methodObject (readonly)

Returns the value of attribute send_cb_method.



27
28
29
# File 'lib/jruby-kafka/kafka-producer.rb', line 27

def send_cb_method
  @send_cb_method
end

#send_methodObject (readonly)

Returns the value of attribute send_method.



27
28
29
# File 'lib/jruby-kafka/kafka-producer.rb', line 27

def send_method
  @send_method
end

Instance Method Details

#closeObject



50
51
52
# File 'lib/jruby-kafka/kafka-producer.rb', line 50

def close
  @producer.close
end

#connectObject



35
36
37
38
39
# File 'lib/jruby-kafka/kafka-producer.rb', line 35

def connect
  @producer = KAFKA_PRODUCER.new(Kafka::Utility.java_properties @options)
  @send_method = producer.java_method :send, [ProducerRecord]
  @send_cb_method = producer.java_method :send, [ProducerRecord, Callback.java_class]
end

#send_msg(topic, partition, key, value, &block) ⇒ Object

throws FailedToSendMessageException or if not connected, StandardError.



42
43
44
45
46
47
48
# File 'lib/jruby-kafka/kafka-producer.rb', line 42

def send_msg(topic, partition, key, value, &block)
  if block
    send_cb_method.call(ProducerRecord.new(topic, partition, key, value), RubyCallback.new(block))
  else
    send_method.call(ProducerRecord.new(topic, partition, key, value))
  end
end