Class: Hermann::Provider::JavaProducer
- Inherits:
-
Object
- Object
- Hermann::Provider::JavaProducer
- Defined in:
- lib/hermann/provider/java_producer.rb
Overview
This class simulates the kafka producer class within a java environment. If the producer throw an exception within the Promise a call to .value!
will raise the exception and the rejected flag will be set to true
Constant Summary collapse
- DEFAULTS =
default kafka Producer options
{ 'partitioner.class' => 'kafka.producer.DefaultPartitioner', 'request.required.acks' => '1', 'message.send.max.retries' => '0' }.freeze
Instance Attribute Summary collapse
-
#producer ⇒ Object
Returns the value of attribute producer.
Instance Method Summary collapse
-
#connect(timeout = 0) ⇒ Object
No-op for now.
-
#connected? ⇒ Boolean
No-op for now.
-
#errored? ⇒ Boolean
No-op for now.
-
#initialize(brokers, opts = {}) ⇒ JavaProducer
constructor
Instantiate JavaProducer.
-
#push_single(msg, topic, key, _) ⇒ Object
Push a value onto the Kafka topic passed to this
Producer
.
Constructor Details
#initialize(brokers, opts = {}) ⇒ JavaProducer
Instantiate JavaProducer
Examples
JavaProducer.new(‘0:9092’, => ‘1’)
33 34 35 36 |
# File 'lib/hermann/provider/java_producer.rb', line 33 def initialize(brokers, opts={}) config = create_config(brokers, opts) @producer = JavaApiUtil::Producer.new(config) end |
Instance Attribute Details
#producer ⇒ Object
Returns the value of attribute producer.
12 13 14 |
# File 'lib/hermann/provider/java_producer.rb', line 12 def producer @producer end |
Instance Method Details
#connect(timeout = 0) ⇒ Object
No-op for now
73 74 75 |
# File 'lib/hermann/provider/java_producer.rb', line 73 def connect(timeout=0) nil end |
#connected? ⇒ Boolean
No-op for now
63 64 65 |
# File 'lib/hermann/provider/java_producer.rb', line 63 def connected? return false end |
#errored? ⇒ Boolean
No-op for now
68 69 70 |
# File 'lib/hermann/provider/java_producer.rb', line 68 def errored? return false end |
#push_single(msg, topic, key, _) ⇒ Object
Push a value onto the Kafka topic passed to this Producer
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/hermann/provider/java_producer.rb', line 46 def push_single(msg, topic, key, _) key = key && key.to_java Concurrent::Promise.execute { data = ProducerUtil::KeyedMessage.new(topic, nil, key, msg.to_java_bytes) begin @producer.send(data) rescue Java::KafkaCommon::FailedToSendMessageException => jexc raise Hermann::Errors::ConnectivityError.new(jexc., :java_exception => jexc) rescue => e raise Hermann::Errors::GeneralError.new(e., :java_exception => e) end } end |