Class: Emque::Producing::Publisher::Kafka

Inherits:
Base
  • Object
show all
Defined in:
lib/emque/producing/publisher/kafka.rb

Instance Method Summary collapse

Methods inherited from Base

#handle_error, #host_name

Constructor Details

#initializeKafka



7
8
9
10
11
12
# File 'lib/emque/producing/publisher/kafka.rb', line 7

def initialize
  @producer = Poseidon::Producer.new(
    Emque::Producing.configuration.kafka_options[:seed_brokers],
    "producer_#{host_name}_#{Process.pid}",
    Emque::Producing.configuration.kafka_options[:producer_options])
end

Instance Method Details

#publish(topic, message_type, message, key = nil) ⇒ Object



14
15
16
17
18
19
20
21
# File 'lib/emque/producing/publisher/kafka.rb', line 14

def publish(topic, message_type, message, key = nil)
  begin
    msg = Poseidon::MessageToSend.new(topic, message, key)
    @producer.send_messages([msg])
  rescue => e
    handle_error(e)
  end
end