Class: Kafka::KafkaProducer
- Inherits:
-
Object
- Object
- Kafka::KafkaProducer
- Defined in:
- lib/jruby-kafka/kafka-producer.rb
Overview
noinspection JRubyStringImportInspection
Defined Under Namespace
Classes: RubyCallback
Constant Summary collapse
- KAFKA_PRODUCER =
Java::org.apache.kafka.clients.producer.KafkaProducer
- REQUIRED =
[ :bootstrap_servers, :key_serializer ]
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#producer ⇒ Object
readonly
Returns the value of attribute producer.
-
#send_cb_method ⇒ Object
readonly
Returns the value of attribute send_cb_method.
-
#send_method ⇒ Object
readonly
Returns the value of attribute send_method.
Instance Method Summary collapse
- #close ⇒ Object
- #connect ⇒ Object
-
#initialize(opts = {}) ⇒ KafkaProducer
constructor
A new instance of KafkaProducer.
-
#send_msg(topic, partition, key, value, &block) ⇒ Object
throws FailedToSendMessageException or if not connected, StandardError.
Constructor Details
#initialize(opts = {}) ⇒ KafkaProducer
Returns a new instance of KafkaProducer.
29 30 31 32 33 |
# File 'lib/jruby-kafka/kafka-producer.rb', line 29 def initialize(opts = {}) Kafka::Utility.validate_arguments REQUIRED, opts @options = opts @send_method = @send_cb_method = proc { throw StandardError.new 'Producer is not connected' } end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
27 28 29 |
# File 'lib/jruby-kafka/kafka-producer.rb', line 27 def @options end |
#producer ⇒ Object (readonly)
Returns the value of attribute producer.
27 28 29 |
# File 'lib/jruby-kafka/kafka-producer.rb', line 27 def producer @producer end |
#send_cb_method ⇒ Object (readonly)
Returns the value of attribute send_cb_method.
27 28 29 |
# File 'lib/jruby-kafka/kafka-producer.rb', line 27 def send_cb_method @send_cb_method end |
#send_method ⇒ Object (readonly)
Returns the value of attribute send_method.
27 28 29 |
# File 'lib/jruby-kafka/kafka-producer.rb', line 27 def send_method @send_method end |
Instance Method Details
#close ⇒ Object
50 51 52 |
# File 'lib/jruby-kafka/kafka-producer.rb', line 50 def close @producer.close end |
#connect ⇒ Object
35 36 37 38 39 |
# File 'lib/jruby-kafka/kafka-producer.rb', line 35 def connect @producer = KAFKA_PRODUCER.new(Kafka::Utility.java_properties @options) @send_method = producer.java_method :send, [ProducerRecord] @send_cb_method = producer.java_method :send, [ProducerRecord, Callback.java_class] end |
#send_msg(topic, partition, key, value, &block) ⇒ Object
throws FailedToSendMessageException or if not connected, StandardError.
42 43 44 45 46 47 48 |
# File 'lib/jruby-kafka/kafka-producer.rb', line 42 def send_msg(topic, partition, key, value, &block) if block send_cb_method.call(ProducerRecord.new(topic, partition, key, value), RubyCallback.new(block)) else send_method.call(ProducerRecord.new(topic, partition, key, value)) end end |