Class: Kafka::Producer
- Inherits:
-
Object
- Object
- Kafka::Producer
- Extended by:
- Gem::Deprecate
- Defined in:
- lib/jruby-kafka/producer.rb
Overview
noinspection JRubyStringImportInspection
Constant Summary collapse
- KAFKA_PRODUCER =
Java::kafka.javaapi.producer.Producer
- REQUIRED =
[ :metadata_broker_list ]
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#producer ⇒ Object
readonly
Returns the value of attribute producer.
-
#send_method ⇒ Object
readonly
Returns the value of attribute send_method.
Instance Method Summary collapse
- #close ⇒ Object
- #connect ⇒ Object
-
#initialize(opts = {}) ⇒ Producer
constructor
Create a Kafka Producer.
-
#send_msg(topic, key, msg) ⇒ Object
throws FailedToSendMessageException or if not connected, StandardError.
- #sendMsg(topic, key, msg) ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ Producer
Create a Kafka Producer
options: metadata_broker_list: [“localhost:9092”] - REQUIRED: a seed list of kafka brokers
23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/jruby-kafka/producer.rb', line 23 def initialize(opts = {}) @options = opts if [:broker_list] [:metadata_broker_list] = .delete :broker_list end if [:metadata_broker_list].is_a? Array [:metadata_broker_list] = [:metadata_broker_list].join(',') end if [:compressed_topics].is_a? Array [:compressed_topics] = [:compressed_topics].join(',') end Kafka::Utility.validate_arguments REQUIRED, @send_method = proc { throw StandardError.new 'Producer is not connected' } end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
17 18 19 |
# File 'lib/jruby-kafka/producer.rb', line 17 def @options end |
#producer ⇒ Object (readonly)
Returns the value of attribute producer.
17 18 19 |
# File 'lib/jruby-kafka/producer.rb', line 17 def producer @producer end |
#send_method ⇒ Object (readonly)
Returns the value of attribute send_method.
17 18 19 |
# File 'lib/jruby-kafka/producer.rb', line 17 def send_method @send_method end |
Instance Method Details
#close ⇒ Object
53 54 55 |
# File 'lib/jruby-kafka/producer.rb', line 53 def close @producer.close end |
#connect ⇒ Object
38 39 40 41 |
# File 'lib/jruby-kafka/producer.rb', line 38 def connect @producer = KAFKA_PRODUCER.new(ProducerConfig.new Kafka::Utility.java_properties @options) @send_method = producer.java_method :send, [KeyedMessage] end |
#send_msg(topic, key, msg) ⇒ Object
throws FailedToSendMessageException or if not connected, StandardError.
44 45 46 |
# File 'lib/jruby-kafka/producer.rb', line 44 def send_msg(topic, key, msg) send_method.call(KeyedMessage.new(topic, key, msg)) end |
#sendMsg(topic, key, msg) ⇒ Object
48 49 50 |
# File 'lib/jruby-kafka/producer.rb', line 48 def sendMsg(topic, key, msg) send_msg(topic, key, msg) end |