Class: Messaging::Adapters::Kafka::Producer

Inherits:
Object
  • Object
show all
Defined in:
lib/messaging/adapters/kafka/producer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(kafka_adapter) ⇒ Producer

Returns a new instance of Producer.



9
10
11
12
13
14
# File 'lib/messaging/adapters/kafka/producer.rb', line 9

def initialize(kafka_adapter)
  @kafka = kafka_adapter
  connect

  at_exit { shutdown }
end

Instance Attribute Details

#kafkaObject (readonly)

Returns the value of attribute kafka.



7
8
9
# File 'lib/messaging/adapters/kafka/producer.rb', line 7

def kafka
  @kafka
end

#pidObject (readonly)

Returns the value of attribute pid.



6
7
8
# File 'lib/messaging/adapters/kafka/producer.rb', line 6

def pid
  @pid
end

#producerObject (readonly)

Returns the value of attribute producer.



5
6
7
# File 'lib/messaging/adapters/kafka/producer.rb', line 5

def producer
  @producer
end

Instance Method Details

#call(message) ⇒ Object

Delivers a message to Kafka asynchronously in a background thread. This method will return immediately.

Parameters:

  • message


20
21
22
23
24
25
# File 'lib/messaging/adapters/kafka/producer.rb', line 20

def call(message)
  reconnect if forked?
  producer.produce(message.to_json, key: message.message_key, topic: message.topic)
rescue ::Kafka::BufferOverflow => e
  ExceptionHandler.call(e, message: message.to_json)
end

#connectObject Also known as: reconnect



34
35
36
37
# File 'lib/messaging/adapters/kafka/producer.rb', line 34

def connect
  @producer = create_producer
  @pid = Process.pid
end

#shutdownObject



27
28
29
30
31
32
# File 'lib/messaging/adapters/kafka/producer.rb', line 27

def shutdown
  return unless producer

  producer.deliver_messages
  producer.shutdown
end