Class: ManageIQ::Messaging::Kafka::Client
- Includes:
- Common, BackgroundJob, Queue, Topic
- Defined in:
- lib/manageiq/messaging/kafka/client.rb
Overview
Messaging client implementation with Kafka being the underlying supporting system. Do not directly instantiate an instance from this class. Use ManageIQ::Messaging::Client.open
method.
Kafka specific connection options accepted by open
method:
-
:client_ref (A reference string to identify the client)
-
:hosts (Array of Kafka cluster hosts, or)
For additional security options, please refer to github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka and github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka
Kafka specific publish_message
options:
-
:group_name (Used as Kafka partition_key)
Kafka specific subscribe_topic
options:
-
:persist_ref (Used as Kafka group_id)
-
:session_timeout (Max time in seconds allowed to process a message, default is 30)
Kafka specific subscribe_messages
options:
-
:max_bytes (Max batch size to read, default is 10Mb)
-
:session_timeout (Max time in seconds allowed to process a message, default is 30)
Without :persist_ref
every topic subscriber receives a copy of each message only when they are active. If multiple topic subscribers join with the same :persist_ref
, each message is consumed by only one of the subscribers. This allows a load balancing among the subscribers. Also any messages sent when all members of the :persist_ref
group are offline will be persisted and delivered when any member in the group is back online. Each message is still copied and delivered to other subscribers that belongs to other :persist_ref
groups or no group.
subscribe_background_job
is currently not implemented.
Constant Summary
Constants included from Topic
Topic::GROUP_FOR_ADHOC_LISTENERS
Constants included from Queue
Queue::GROUP_FOR_QUEUE_MESSAGES
Instance Attribute Summary collapse
-
#encoding ⇒ Object
Returns the value of attribute encoding.
Instance Method Summary collapse
- #ack(ack_ref) ⇒ Object
- #close ⇒ Object
-
#topics ⇒ Object
list all topics.
Methods inherited from Client
open, #publish_message, #publish_messages, #publish_topic, #subscribe_background_job, #subscribe_messages, #subscribe_topic
Instance Attribute Details
#encoding ⇒ Object
Returns the value of attribute encoding.
49 50 51 |
# File 'lib/manageiq/messaging/kafka/client.rb', line 49 def encoding @encoding end |
Instance Method Details
#ack(ack_ref) ⇒ Object
51 52 53 54 55 56 |
# File 'lib/manageiq/messaging/kafka/client.rb', line 51 def ack(ack_ref) ack_ref.commit rescue Rdkafka::RdkafkaError => e logger.warn("ack failed with error #{e.}") raise unless e. =~ /no_offset/ end |
#close ⇒ Object
58 59 60 61 62 63 64 65 66 67 |
# File 'lib/manageiq/messaging/kafka/client.rb', line 58 def close @admin&.close @admin = nil @producer&.close @producer = nil @consumer&.close @consumer = nil end |
#topics ⇒ Object
list all topics
70 71 72 |
# File 'lib/manageiq/messaging/kafka/client.rb', line 70 def topics admin..topics.map { |topic| topic[:topic_name] } end |