Class: Poseidon::Producer
- Inherits:
-
Object
- Object
- Poseidon::Producer
- Defined in:
- lib/poseidon/producer.rb
Overview
Provides a high level interface for sending messages to a cluster of Kafka brokers.
Producer Creation
Producer requires a broker list and a client_id:
producer = Producer.new(["broker1:port1", "broker2:port1"], "my_client_id",
:type => :sync)
The broker list is only used to bootstrap our knowledge of the cluster -- it does not need to contain every broker. The client id should be unique across all clients in the cluster.
Sending Messages
Messages must have a topic before being sent:
= []
<< Poseidon::MessageToSend.new("topic1", "Hello Word")
<< Poseidon::MessageToSend.new("user_updates_topic", user.update, user.id)
producer.()
Producer Types
There are two types of producers: sync and async. They can be specified via the :type option when creating a producer.
Sync Producer
The :sync producer blocks while sends messages to the cluster. The more messages you can send per #send_messages call the more efficient it will be.
Compression
When creating the producer you can specify a compression method:
producer = Producer.new(["broker1:port1"], "my_client_id",
:type => :sync, :compression_codec => :gzip)
If you don't specify which topics to compress it will compress all topics. You can specify a set of topics to compress when creating the producer:
producer = Producer.new(["broker1:port1"], "my_client_id",
:type => :sync, :compression_codec => :gzip,
:compressed_topics => ["compressed_topic_1"])
Partitioning
For keyless messages the producer will round-robin messages to all available partitions for at topic. This means that if we are unable to send messages to a specific broker we'll retry sending those to a different broker.
However, if you specify a key when creating the message, the producer will choose a partition based on the key and only send to that partition.
Custom Partitioning
You may also specify a custom partitioning scheme for messages by passing a Proc (or any object that responds to #call) to the Producer. The proc must return a Fixnum >= 0 and less-than partition_count.
my_partitioner = Proc.new { |key, partition_count| Zlib::crc32(key) % partition_count }
producer = Producer.new(["broker1:port1", "broker2:port1"], "my_client_id",
:type => :sync, :partitioner => my_partitioner)
Instance Method Summary collapse
-
#close ⇒ Object
(also: #shutdown)
Closes all open connections to brokers.
-
#initialize(brokers, client_id, options = {}) ⇒ Producer
constructor
Returns a new Producer.
-
#send_messages(messages) ⇒ Boolean
Send messages to the cluster.
Constructor Details
#initialize(brokers, client_id, options = {}) ⇒ Producer
Returns a new Producer.
136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/poseidon/producer.rb', line 136 def initialize(brokers, client_id, = {}) = .dup () if !brokers.respond_to?(:each) raise ArgumentError, "brokers must respond to #each" end @brokers = brokers @client_id = client_id @producer = build_producer() @shutdown = false end |
Instance Method Details
#close ⇒ Object Also known as: shutdown
Closes all open connections to brokers
167 168 169 170 |
# File 'lib/poseidon/producer.rb', line 167 def close @shutdown = true @producer.close end |
#send_messages(messages) ⇒ Boolean
Send messages to the cluster.
157 158 159 160 161 162 163 164 |
# File 'lib/poseidon/producer.rb', line 157 def () raise Errors::ProducerShutdownError if @shutdown if !.respond_to?(:each) raise ArgumentError, "messages must respond to #each" end @producer.(()) end |