Module: Rimless::KafkaHelpers
Overview
The top-level Apache Kafka helpers.
Class Method Summary collapse
-
.async_message(data:, schema:, topic:, **args) ⇒ Object
Send a single message to Apache Kafka.
-
.async_raw_message(data:, topic:, **args) ⇒ Object
Send a single message to Apache Kafka.
-
.sync_message(data:, schema:, topic:, **args) ⇒ Object
(also: message)
Send a single message to Apache Kafka.
-
.sync_raw_message(data:, topic:, **args) ⇒ Object
(also: raw_message)
Send a single message to Apache Kafka.
-
.topic(*args) ⇒ String
Generate a common topic name for Apache Kafka while taking care of configured prefixes.
Class Method Details
.async_message(data:, schema:, topic:, **args) ⇒ Object
Send a single message to Apache Kafka. The data is encoded according to the given Apache Avro schema. The destination Kafka topic may be a relative name, or a hash which is passed to the .topic
method to manipulate the application details. The message is send is an asynchronous, non-blocking way.
79 80 81 82 |
# File 'lib/rimless/kafka_helpers.rb', line 79 def (data:, schema:, topic:, **args) encoded = Rimless.encode(data, schema: schema) (data: encoded, topic: topic, **args) end |
.async_raw_message(data:, topic:, **args) ⇒ Object
Send a single message to Apache Kafka. The data is not touched, so you need to encode it yourself before you pass it in. The destination Kafka topic may be a relative name, or a hash which is passed to the .topic
method to manipulate the application details. The message is send is an asynchronous, non-blocking way.
108 109 110 111 |
# File 'lib/rimless/kafka_helpers.rb', line 108 def (data:, topic:, **args) args = args.merge(topic: topic(topic)) WaterDrop::AsyncProducer.call(data, **args) end |
.sync_message(data:, schema:, topic:, **args) ⇒ Object Also known as: message
Send a single message to Apache Kafka. The data is encoded according to the given Apache Avro schema. The destination Kafka topic may be a relative name, or a hash which is passed to the .topic
method to manipulate the application details. The message is send is a synchronous, blocking way.
63 64 65 66 |
# File 'lib/rimless/kafka_helpers.rb', line 63 def (data:, schema:, topic:, **args) encoded = Rimless.encode(data, schema: schema) (data: encoded, topic: topic, **args) end |
.sync_raw_message(data:, topic:, **args) ⇒ Object Also known as: raw_message
Send a single message to Apache Kafka. The data is not touched, so you need to encode it yourself before you pass it in. The destination Kafka topic may be a relative name, or a hash which is passed to the .topic
method to manipulate the application details. The message is send is a synchronous, blocking way.
93 94 95 96 |
# File 'lib/rimless/kafka_helpers.rb', line 93 def (data:, topic:, **args) args = args.merge(topic: topic(topic)) WaterDrop::SyncProducer.call(data, **args) end |
.topic(*args) ⇒ String
Generate a common topic name for Apache Kafka while taking care of configured prefixes.
rubocop:disable Metrics/MethodLength because of the usage flexibility rubocop:disable Metrics/AbcSize dito rubocop:disable Metrics/CyclomaticComplexity dito rubocop:disable Metrics/PerceivedComplexity dito
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/rimless/kafka_helpers.rb', line 29 def topic(*args) opts = args.last name = args.first if [String, Symbol].member?(args.first.class) if opts.is_a?(Hash) # When we got a full name, we use it as is return opts[:full_name] if opts.key? :full_name name = opts[:name] if opts.key?(:name) app = opts[:app] if opts.key?(:app) end name ||= nil app ||= Rimless.configuration.app_name raise ArgumentError, 'No name given' if name.nil? "#{Rimless.topic_prefix(app)}#{name}".tr('_', '-') end |