WaterDrop
Gem used to send messages to Kafka in an easy way with an extra validation layer. It is a part of the Karafka ecosystem.
WaterDrop is based on Zendesks delivery_boy gem.
It is:
- Thread safe
- Supports sync and async producers
- Working with 0.10.1+ Kafka
Installation
gem install waterdrop
or add this to your Gemfile:
gem 'waterdrop'
and run
bundle install
Setup
WaterDrop is a complex tool, that contains multiple configuration options. To keep everything organized, all the configuration options were divided into two groups:
- WaterDrop options - options directly related to Karafka framework and it's components
- Ruby-Kafka driver options - options related to Ruby-Kafka/Delivery boy
To apply all those configuration options, you need to use the #setup
method:
WaterDrop.setup do |config|
config.deliver = true
config.kafka.seed_brokers = %w[kafka://localhost:9092]
end
WaterDrop configuration options
Option | Description |
---|---|
client_id | This is how the client will identify itself to the Kafka brokers |
logger | Logger that we want to use |
deliver | Should we send messages to Kafka |
Ruby-Kafka driver and Delivery boy configuration options
Note: We've listed here only the most important configuration options. If you're interested in all the options, please go to the config.rb file for more details.
Note: All the options are subject to validations. In order to check what is and what is not acceptable, please go to the config.rb validation schema file.
Option | Description |
---|---|
raise_on_buffer_overflow | Should we raise an exception, when messages can't be sent in an async way due to the message buffer overflow or should we just drop them |
delivery_interval | The number of seconds between background message deliveries. Disable timer-based background deliveries by setting this to 0. |
delivery_threshold | The number of buffered messages that will trigger a background message delivery. Disable buffer size based background deliveries by setting this to 0. |
required_acks | The number of Kafka replicas that must acknowledge messages before they're considered as successfully written. |
ack_timeout | A timeout executed by a broker when the client is sending messages to it. |
max_retries | The number of retries when attempting to deliver messages. |
retry_backoff | The number of seconds to wait after a failed attempt to send messages to a Kafka broker before retrying. |
max_buffer_bytesize | The maximum number of bytes allowed in the buffer before new messages are rejected. |
max_buffer_size | The maximum number of messages allowed in the buffer before new messages are rejected. |
max_queue_size | The maximum number of messages allowed in the queue before new messages are rejected. |
sasl_plain_username | The username used to authenticate. |
sasl_plain_password | The password used to authenticate. |
This configuration can be also placed in config/initializers and can vary based on the environment:
WaterDrop.setup do |config|
config.deliver = Rails.env.production?
config.kafka.seed_brokers = [Rails.env.production? ? 'kafka://prod-host:9091' : 'kafka://localhost:9092']
end
Usage
To send Kafka messages, just use one of the producers:
WaterDrop::SyncProducer.call('message', topic: 'my-topic')
# or for async
WaterDrop::AsyncProducer.call('message', topic: 'my-topic')
Both SyncProducer
and AsyncProducer
accept following options:
Option | Required | Value type | Description |
---|---|---|---|
topic |
true | String | The Kafka topic that should be written to |
key |
false | String | The key that should be set in the Kafka message |
partition |
false | Integer | A specific partition number that should be written to |
partition_key |
false | String | A string that can be used to deterministically select the partition |
create_time |
false | Time | The timestamp that should be set on the message |
Keep in mind, that message you want to send should be either binary or stringified (to_s, to_json, etc).
References
Note on contributions
First, thank you for considering contributing to WaterDrop! It's people like you that make the open source community such a great community!
Each pull request must pass all the RSpec specs and meet our quality requirements.
To check if everything is as it should be, we use Coditsu that combines multiple linters and code analyzers for both code and documentation. Once you're done with your changes, submit a pull request.
Coditsu will automatically check your work against our quality standards. You can find your commit check results on the builds page of WaterDrop repository.