Class: ManageIQ::Messaging::Kafka::Client

Inherits:
Client
  • Object
show all
Includes:
Common, BackgroundJob, Queue, Topic
Defined in:
lib/manageiq/messaging/kafka/client.rb

Overview

Messaging client implementation with Kafka being the underlying supporting system. Do not directly instantiate an instance from this class. Use ManageIQ::Messaging::Client.open method.

Kafka specific connection options accepted by open method:

  • :client_ref (A reference string to identify the client)

  • :hosts (Array of Kafka cluster hosts, or)

For additional security options, please refer to github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka and github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka

Kafka specific publish_message options:

  • :group_name (Used as Kafka partition_key)

Kafka specific subscribe_topic options:

  • :persist_ref (Used as Kafka group_id)

  • :session_timeout (Max time in seconds allowed to process a message, default is 30)

Kafka specific subscribe_messages options:

  • :max_bytes (Max batch size to read, default is 10Mb)

  • :session_timeout (Max time in seconds allowed to process a message, default is 30)

Without :persist_ref every topic subscriber receives a copy of each message only when they are active. If multiple topic subscribers join with the same :persist_ref, each message is consumed by only one of the subscribers. This allows a load balancing among the subscribers. Also any messages sent when all members of the :persist_ref group are offline will be persisted and delivered when any member in the group is back online. Each message is still copied and delivered to other subscribers that belongs to other :persist_ref groups or no group.

subscribe_background_job is currently not implemented.

Constant Summary

Constants included from Topic

Topic::GROUP_FOR_ADHOC_LISTENERS

Constants included from Queue

Queue::GROUP_FOR_QUEUE_MESSAGES

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Client

open, #publish_message, #publish_messages, #publish_topic, #subscribe_background_job, #subscribe_messages, #subscribe_topic

Instance Attribute Details

#encodingObject

Returns the value of attribute encoding.



49
50
51
# File 'lib/manageiq/messaging/kafka/client.rb', line 49

def encoding
  @encoding
end

Instance Method Details

#ack(ack_ref) ⇒ Object



51
52
53
54
55
56
# File 'lib/manageiq/messaging/kafka/client.rb', line 51

def ack(ack_ref)
  ack_ref.commit
rescue Rdkafka::RdkafkaError => e
  logger.warn("ack failed with error #{e.message}")
  raise unless e.message =~ /no_offset/
end

#closeObject



58
59
60
61
62
63
64
65
66
67
# File 'lib/manageiq/messaging/kafka/client.rb', line 58

def close
  @admin&.close
  @admin = nil

  @producer&.close
  @producer = nil

  @consumer&.close
  @consumer = nil
end

#topicsObject

list all topics



70
71
72
# File 'lib/manageiq/messaging/kafka/client.rb', line 70

def topics
  admin..topics.map { |topic| topic[:topic_name] }
end