Module: Rimless::KafkaHelpers

Extended by:
ActiveSupport::Concern
Included in:
Rimless
Defined in:
lib/rimless/kafka_helpers.rb

Overview

The top-level Apache Kafka helpers.

Class Method Summary collapse

Class Method Details

.async_message(data:, schema:, topic:, **args) ⇒ Object

Send a single message to Apache Kafka. The data is encoded according to the given Apache Avro schema. The destination Kafka topic may be a relative name, or a hash which is passed to the .topic method to manipulate the application details. The message is send is an asynchronous, non-blocking way.

Parameters:

  • data (Hash{Symbol => Mixed})

    the raw data, unencoded

  • schema (String, Symbol)

    the Apache Avro schema to use

  • topic (String, Symbol, Hash{Symbol => Mixed})

    the destination Apache Kafka topic



79
80
81
82
# File 'lib/rimless/kafka_helpers.rb', line 79

def async_message(data:, schema:, topic:, **args)
  encoded = Rimless.encode(data, schema: schema)
  async_raw_message(data: encoded, topic: topic, **args)
end

.async_raw_message(data:, topic:, **args) ⇒ Object

Send a single message to Apache Kafka. The data is not touched, so you need to encode it yourself before you pass it in. The destination Kafka topic may be a relative name, or a hash which is passed to the .topic method to manipulate the application details. The message is send is an asynchronous, non-blocking way.

Parameters:

  • data (Hash{Symbol => Mixed})

    the raw data, unencoded

  • topic (String, Symbol, Hash{Symbol => Mixed})

    the destination Apache Kafka topic



108
109
110
111
# File 'lib/rimless/kafka_helpers.rb', line 108

def async_raw_message(data:, topic:, **args)
  args = args.merge(topic: topic(topic))
  WaterDrop::AsyncProducer.call(data, **args)
end

.sync_message(data:, schema:, topic:, **args) ⇒ Object Also known as: message

Send a single message to Apache Kafka. The data is encoded according to the given Apache Avro schema. The destination Kafka topic may be a relative name, or a hash which is passed to the .topic method to manipulate the application details. The message is send is a synchronous, blocking way.

Parameters:

  • data (Hash{Symbol => Mixed})

    the raw data, unencoded

  • schema (String, Symbol)

    the Apache Avro schema to use

  • topic (String, Symbol, Hash{Symbol => Mixed})

    the destination Apache Kafka topic



63
64
65
66
# File 'lib/rimless/kafka_helpers.rb', line 63

def sync_message(data:, schema:, topic:, **args)
  encoded = Rimless.encode(data, schema: schema)
  sync_raw_message(data: encoded, topic: topic, **args)
end

.sync_raw_message(data:, topic:, **args) ⇒ Object Also known as: raw_message

Send a single message to Apache Kafka. The data is not touched, so you need to encode it yourself before you pass it in. The destination Kafka topic may be a relative name, or a hash which is passed to the .topic method to manipulate the application details. The message is send is a synchronous, blocking way.

Parameters:

  • data (Hash{Symbol => Mixed})

    the raw data, unencoded

  • topic (String, Symbol, Hash{Symbol => Mixed})

    the destination Apache Kafka topic



93
94
95
96
# File 'lib/rimless/kafka_helpers.rb', line 93

def sync_raw_message(data:, topic:, **args)
  args = args.merge(topic: topic(topic))
  WaterDrop::SyncProducer.call(data, **args)
end

.topic(*args) ⇒ String

Generate a common topic name for Apache Kafka while taking care of configured prefixes.

rubocop:disable Metrics/MethodLength because of the usage flexibility rubocop:disable Metrics/AbcSize dito rubocop:disable Metrics/CyclomaticComplexity dito rubocop:disable Metrics/PerceivedComplexity dito

Examples:

Name only

Rimless.topic(:users)

Name with app

Rimless.topic(:users, app: 'test-api')

Mix and match

Rimless.topic(name: 'test', app: :fancy_app)

Full name - use as is

Rimless.topic(full_name: 'my.custom.topic.name')

Parameters:

  • args (Array<Mixed>)

    the relative topic name

Returns:

  • (String)

    the complete topic name

Raises:

  • (ArgumentError)


29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/rimless/kafka_helpers.rb', line 29

def topic(*args)
  opts = args.last
  name = args.first if [String, Symbol].member?(args.first.class)

  if opts.is_a?(Hash)
    # When we got a full name, we use it as is
    return opts[:full_name] if opts.key? :full_name

    name = opts[:name] if opts.key?(:name)
    app = opts[:app] if opts.key?(:app)
  end

  name ||= nil
  app ||= Rimless.configuration.app_name

  raise ArgumentError, 'No name given' if name.nil?

  "#{Rimless.topic_prefix(app)}#{name}".tr('_', '-')
end