Class: Poseidon::Producer

Inherits:
Object
  • Object
show all
Defined in:
lib/poseidon/producer.rb

Overview

Provides a high level interface for sending messages to a cluster of Kafka brokers.

Producer Creation

Producer requires a broker list and a client_id:

producer = Producer.new(["broker1:port1", "broker2:port1"], "my_client_id",
                        :type => :sync)

The broker list is only used to bootstrap our knowledge of the cluster -- it does not need to contain every broker. The client id should be unique across all clients in the cluster.

Sending Messages

Messages must have a topic before being sent:

messages = []
messages << Poseidon::MessageToSend.new("topic1", "Hello Word")
messages << Poseidon::MessageToSend.new("user_updates_topic", user.update, user.id)
producer.send_messages(messages)

Producer Types

There are two types of producers: sync and async. They can be specified via the :type option when creating a producer.

Sync Producer

The :sync producer blocks while sends messages to the cluster. The more messages you can send per #send_messages call the more efficient it will be.

Compression

When creating the producer you can specify a compression method:

producer = Producer.new(["broker1:port1"], "my_client_id",
                        :type => :sync, :compression_codec => :gzip)

If you don't specify which topics to compress it will compress all topics. You can specify a set of topics to compress when creating the producer:

producer = Producer.new(["broker1:port1"], "my_client_id",
                        :type => :sync, :compression_codec => :gzip,
                        :compressed_topics => ["compressed_topic_1"])

Partitioning

For keyless messages the producer will round-robin messages to all available partitions for at topic. This means that if we are unable to send messages to a specific broker we'll retry sending those to a different broker.

However, if you specify a key when creating the message, the producer will choose a partition based on the key and only send to that partition.

Custom Partitioning

You may also specify a custom partitioning scheme for messages by passing a Proc (or any object that responds to #call) to the Producer. The proc must return a Fixnum >= 0 and less-than partition_count.

my_partitioner = Proc.new { |key, partition_count|  Zlib::crc32(key) % partition_count }

producer = Producer.new(["broker1:port1", "broker2:port1"], "my_client_id",
                        :type => :sync, :partitioner => my_partitioner)

Instance Method Summary collapse

Constructor Details

#initialize(brokers, client_id, options = {}) ⇒ Producer

Returns a new Producer.

Parameters:

  • brokers (Array<String>)

    An array of brokers in the form "host1:port1"

  • client_id (String)

    A client_id used to indentify the producer.

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :type (:sync / :async) — default: :sync

    Whether we should send messages right away or queue them and send them in the background.

  • :compression_codec (:gzip / :snappy / :none) — default: :none

    Type of compression to use.

  • :compressed_topics (Enumberable<String>) — default: nil

    Topics to compress. If this is not specified we will compress all topics provided that +:compression_codec+ is set.

  • :metadata_refresh_interval_ms (Integer: Milliseconds) — default: 600_000

    How frequently we should update the topic metadata in milliseconds.

  • :partitioner (#call, nil)

    Object which partitions messages based on key. Responds to #call(key, partition_count).

  • :max_send_retries (Integer) — default: 3

    Number of times to retry sending of messages to a leader.

  • :retry_backoff_ms (Integer) — default: 100

    The amount of time (in milliseconds) to wait before refreshing the metadata after we are unable to send messages. Number of times to retry sending of messages to a leader.

  • :required_acks (Integer) — default: 0

    The number of acks required per request.

  • :ack_timeout_ms (Integer) — default: 1500

    How long the producer waits for acks.

  • :socket_timeout_ms] (Integer) — default: 10000

    How long the producer socket waits for any reply from server.


136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/poseidon/producer.rb', line 136

def initialize(brokers, client_id, options = {})
  options = options.dup
  validate_options(options)

  if !brokers.respond_to?(:each)
    raise ArgumentError, "brokers must respond to #each"
  end
  @brokers    = brokers
  @client_id  = client_id
  @producer   = build_producer(options)
  @shutdown   = false
end

Instance Method Details

#closeObject Also known as: shutdown

Closes all open connections to brokers


167
168
169
170
# File 'lib/poseidon/producer.rb', line 167

def close
  @shutdown = true
  @producer.close
end

#send_messages(messages) ⇒ Boolean

Send messages to the cluster. Raises an exception if the producer fails to send the messages.

Parameters:

  • messages (Enumerable<MessageToSend>)

    Messages must have a +topic+ set and may have a +key+ set.

Returns:

  • (Boolean)

Raises:


157
158
159
160
161
162
163
164
# File 'lib/poseidon/producer.rb', line 157

def send_messages(messages)
  raise Errors::ProducerShutdownError if @shutdown
  if !messages.respond_to?(:each)
    raise ArgumentError, "messages must respond to #each"
  end

  @producer.send_messages(convert_to_messages_objects(messages))
end