Class: Kafka::FFI::Client
- Inherits:
-
OpaquePointer
- Object
- OpaquePointer
- Kafka::FFI::Client
- Defined in:
- lib/kafka/ffi/client.rb
Overview
Naming this is hard and librdkafka primarily just refers to it as “a handle” to an instance. It’s more akin to an internal service and this Client talks the API of that service.
Client is a handle to a configured librdkafka instance that begins operation once created. Client is an abstract class and will provide either a Consumer or Producer based on the type being created. Each Client instance can either produce or consume messages to / from topics and cannot do both.
Instance Attribute Summary
Attributes inherited from OpaquePointer
Class Method Summary collapse
- .from_native(ptr, _ctx) ⇒ Object
-
.new(type, config = nil) ⇒ Consumer, Producer
Create a new Client of type with the given configuration.
Instance Method Summary collapse
-
#alter_configs(resources, options: nil, timeout: 5000) ⇒ nil, Array<Admin::ConfigResource>
Update the configuration for the specified resources.
-
#brokers_add(brokers) ⇒ Integer
Adds one or more brokers to the Client’s list of initial bootstrap brokers.
-
#cluster_id(timeout: 1000) ⇒ nil, String
Retrieves the Client’s Cluster ID.
-
#config ⇒ Config
Retrive the current configuration used by Client.
-
#controller_id(timeout: 1000) ⇒ Integer
Retrieves the current Controller ID as reported by broker metadata.
-
#create_partitions(requests, options: nil, timeout: 5000) ⇒ nil, Admin::Result<Admin::TopicResult>
Create additional partition(s) for a topic on the cluster.
-
#create_topics(topics, options: nil, timeout: 5000) ⇒ nil, Admin::Result<TopicResult>
Create topics in the cluster with the given configuration.
-
#default_topic_conf_dup ⇒ TopicConfig
Create a copy of the Client’s default topic configuration object.
-
#delete_topics(topics, options: nil, timeout: 5000) ⇒ nil, Array<TopicResult>
Delete a list of Topics from the cluster.
-
#describe_configs(resources, options: nil, timeout: 5000) ⇒ nil, Admin::Result<Admin::ConfigResource>
Get configuration for the specified resources.
-
#destroy ⇒ Object
Release all of the resources used by this Client.
-
#get_background_queue ⇒ Queue?
Get a reference to the background thread queue.
-
#get_main_queue ⇒ Queue
Get a reference to the main librdkafka event queue.
-
#group_list(group: nil, timeout: 1000) ⇒ Kafka::FFI::GroupList
List and describe client groups in the cluster.
-
#initialize(ptr) ⇒ Client
constructor
A new instance of Client.
-
#metadata(local_only: false, topic: nil, timeout: 1000) ⇒ Metadata
Retrieve metadata from the Kafka cluster.
-
#name ⇒ String
Retrieve the Kafka handle name.
-
#offsets_for_times(list, timeout: 1000) ⇒ TopicPartitionList
Look up the offsets for the given partition by timestamp.
-
#outq_len ⇒ Integer
(also: #out_queue_len)
Returns the current length of the outbound queue.
-
#pause_partitions(list) ⇒ TopicPartitionList
Pause producing or consuming of the provided list of partitions.
-
#poll(timeout: 250) ⇒ Integer
Polls for events on the the Client, causing callbacks to be fired.
-
#query_watermark_offsets(topic, partition, timeout: 1000) ⇒ Range
Query the broker for the oldest and newest offsets for the partition.
-
#resume_partitions(list) ⇒ TopicPartitionList
Resume producing or consuming of the provided list of partitions.
-
#set_log_queue(dest) ⇒ Object
Forward librdkafka and debug logs to the specified queue.
-
#topic(name, config = nil) ⇒ Topic
Create or fetch the Topic with the given name.
Methods inherited from OpaquePointer
Constructor Details
#initialize(ptr) ⇒ Client
Returns a new instance of Client.
101 102 103 104 105 106 107 108 |
# File 'lib/kafka/ffi/client.rb', line 101 def initialize(ptr) super(ptr) # Caches Topics created on the first call to #topic below. Topics need to # be destroyed before destroying the Client. We keep a set in the client # so end users don't need to think about it. @topics = {} end |
Class Method Details
.from_native(ptr, _ctx) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/kafka/ffi/client.rb', line 69 def self.from_native(ptr, _ctx) if !ptr.is_a?(::FFI::Pointer) raise TypeError, "from_native can only convert from a ::FFI::Pointer to #{self}" end # Converting from a null pointer should return nil. Likely this was # caused by rd_kafka_new returning an error and a NULL pointer for the # Client. if ptr.null? return nil end # Build a temporary Client to pass to rd_kafka_type. There is a bit of a # chicken and egg problem here. We can't create the final class until # after we know the type. But for type safety we want to pass a Client. cfg = allocate cfg.send(:initialize, ptr) type = ::Kafka::FFI.rd_kafka_type(cfg) klass = case type when :producer then Producer when :consumer then Consumer else raise ArgumentError, "unknown Kafka client type: #{type}" end client = klass.allocate client.send(:initialize, ptr) client end |
.new(type, config = nil) ⇒ Consumer, Producer
Create a new Client of type with the given configuration.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/kafka/ffi/client.rb', line 34 def self.new(type, config = nil) error = ::FFI::MemoryPointer.new(:char, 512) # Convenience for passing in a Kafka::Config instead of building a # Kafka::FFI::Config since Kafka::Config provides a way to create a # config from a Hash. config = case config when Config, nil then config when ::Kafka::Config then config.to_ffi when Hash then ::Kafka::Config.new(config).to_ffi else raise ArgumentError, "config must be on of nil, Config, ::Kafka::Config, or Hash" end client = Kafka::FFI.rd_kafka_new(type, config, error, error.size) if client.nil? raise Error, error.read_string end if config # Store a reference to the config on the Client instance. We do this to # tie the Config's lifecycle to the Client instance in Ruby since they # are already tied in librdkafka. This ensures that any Ruby objects # referenced in the config (like callbacks) are not garbage collected. # # Using instance_variable_set to avoid exposing an API method that # could cause confusion from end users since the config cannot be # changed after initialization. client.instance_variable_set(:@config, config) end client end |
Instance Method Details
#alter_configs(resources, options: nil, timeout: 5000) ⇒ nil, Array<Admin::ConfigResource>
AlterConfigs will replace all existing configuration for the given resources, reverting all unspecified config options to their default values.
At most one :broker type ConfigResource can be specified per call to alter_configs since the changes must be sent to the broker specified in the resource.
Update the configuration for the specified resources. Updates may succeed for a subset of the provided resources while others fail. The configuration for a particular resource is update atomically, replacing values using the provided ConfigResource (set via set_config) and reverting any unspecified config options to their default values.
Application is responsible for calling #destroy on the returned results when done with the results.
573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 |
# File 'lib/kafka/ffi/client.rb', line 573 def alter_configs(resources, options: nil, timeout: 5000) resources = Array(resources) # NewPartitions wants an array of Admin::ConfigResource list = ::FFI::MemoryPointer.new(:pointer, resources.length) list.write_array_of_pointer(resources.map(&:pointer)) # Queue to receive the result queue = ::Kafka::FFI::Queue.new(self) ::Kafka::FFI.rd_kafka_AlterConfigs(self, list, resources.length, , queue) event = queue.poll(timeout: timeout) if event ::Kafka::FFI::Admin::Result.new(event) end ensure list.free queue.destroy end |
#brokers_add(brokers) ⇒ Integer
It is preferred to set brokers through the ‘metadata.broker.list` or `bootstrap.servers` config options.
Adds one or more brokers to the Client’s list of initial bootstrap brokers. Additionaly brokers will be discovered automatically once the Client connects to a broker by querying the broker metadata.
430 431 432 433 434 435 436 |
# File 'lib/kafka/ffi/client.rb', line 430 def brokers_add(brokers) if brokers.is_a?(Array) brokers = brokers.join(",") end ::Kafka::FFI.rd_kafka_brokers_add(self, brokers) end |
#cluster_id(timeout: 1000) ⇒ nil, String
requires config ‘api.version.request` set to true
Retrieves the Client’s Cluster ID
non-bloack call that will return immediately if metadata is cached.
136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/kafka/ffi/client.rb', line 136 def cluster_id(timeout: 1000) ptr = ::Kafka::FFI.rd_kafka_clusterid(self, timeout) if ptr.null? return nil end begin ptr.read_string ensure # Documentation explicitly says that the string needs to be freed. ::Kafka::FFI.rd_kafka_mem_free(self, ptr) end end |
#config ⇒ Config
The returned config is read-only and tied to the lifetime of the Client. Don’t try to modify or destroy the config.
Retrive the current configuration used by Client.
116 117 118 |
# File 'lib/kafka/ffi/client.rb', line 116 def config ::Kafka::FFI.rd_kafka_conf(self) end |
#controller_id(timeout: 1000) ⇒ Integer
requires config ‘api.version.request` set to true
Retrieves the current Controller ID as reported by broker metadata.
159 160 161 |
# File 'lib/kafka/ffi/client.rb', line 159 def controller_id(timeout: 1000) ::Kafka::FFI.rd_kafka_controllerid(self, timeout) end |
#create_partitions(requests, options: nil, timeout: 5000) ⇒ nil, Admin::Result<Admin::TopicResult>
Create additional partition(s) for a topic on the cluster.
Application is responsible for calling #destroy on the returned results when done with the results.
and possibly broker assignments for those partitions.
523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 |
# File 'lib/kafka/ffi/client.rb', line 523 def create_partitions(requests, options: nil, timeout: 5000) requests = Array(requests) # NewPartitions wants an array of Admin::NewPartitions list = ::FFI::MemoryPointer.new(:pointer, requests.length) list.write_array_of_pointer(requests.map(&:pointer)) # Queue to receive the result queue = ::Kafka::FFI::Queue.new(self) ::Kafka::FFI.rd_kafka_CreatePartitions(self, list, requests.length, , queue) event = queue.poll(timeout: timeout) if event ::Kafka::FFI::Admin::Result.new(event) end ensure list.free queue.destroy end |
#create_topics(topics, options: nil, timeout: 5000) ⇒ nil, Admin::Result<TopicResult>
Create topics in the cluster with the given configuration.
Application is responsible for calling #destroy on the returned results when done with the results.
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 |
# File 'lib/kafka/ffi/client.rb', line 455 def create_topics(topics, options: nil, timeout: 5000) topics = Array(topics) # CreateTopic wants an array of topics list = ::FFI::MemoryPointer.new(:pointer, topics.length) list.write_array_of_pointer(topics.map(&:pointer)) queue = ::Kafka::FFI::Queue.new(self) ::Kafka::FFI.rd_kafka_CreateTopics(self, list, topics.length, , queue) event = queue.poll(timeout: timeout) if event ::Kafka::FFI::Admin::Result.new(event) end ensure list.free queue.destroy end |
#default_topic_conf_dup ⇒ TopicConfig
Create a copy of the Client’s default topic configuration object. The caller is now responsible for ownership of the new config.
393 394 395 |
# File 'lib/kafka/ffi/client.rb', line 393 def default_topic_conf_dup ::Kafka::FFI.rd_kafka_default_topic_conf_dup(self) end |
#delete_topics(topics, options: nil, timeout: 5000) ⇒ nil, Array<TopicResult>
Delete a list of Topics from the cluster.
Application is responsible for calling #destroy on the returned results when done with the results.
488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 |
# File 'lib/kafka/ffi/client.rb', line 488 def delete_topics(topics, options: nil, timeout: 5000) topics = Array(topics) # DeleteTopics wants an array of topics list = ::FFI::MemoryPointer.new(:pointer, topics.length) list.write_array_of_pointer(topics.map(&:pointer)) queue = ::Kafka::FFI::Queue.new(self) ::Kafka::FFI.rd_kafka_DeleteTopics(self, list, topics.length, , queue) event = queue.poll(timeout: timeout) if event ::Kafka::FFI::Admin::Result.new(event) end ensure list.free queue.destroy end |
#describe_configs(resources, options: nil, timeout: 5000) ⇒ nil, Admin::Result<Admin::ConfigResource>
Get configuration for the specified resources.
Application is responsible for calling #destroy on the returned results when done with the results.
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 |
# File 'lib/kafka/ffi/client.rb', line 612 def describe_configs(resources, options: nil, timeout: 5000) resources = Array(resources) # DescribeConfigs wants an array of Admin::ConfigResource pointers list = ::FFI::MemoryPointer.new(:pointer, resources.length) list.write_array_of_pointer(resources.map(&:pointer)) # Queue to receive the result queue = ::Kafka::FFI::Queue.new(self) ::Kafka::FFI.rd_kafka_DescribeConfigs(self, list, resources.length, , queue) event = queue.poll(timeout: timeout) if event ::Kafka::FFI::Admin::Result.new(event) end ensure list.free queue.destroy end |
#destroy ⇒ Object
Release all of the resources used by this Client. This may block until the instance has finished it’s shutdown procedure. Always make sure to destory any associated resources and cleanly shutting down the instance before calling destroy.
637 638 639 640 641 642 643 644 645 646 647 |
# File 'lib/kafka/ffi/client.rb', line 637 def destroy if !pointer.null? # Clean up any cached topics before destroying the Client. @topics.each do |_, topic| ::Kafka::FFI.rd_kafka_topic_destroy(topic) end @topics.clear ::Kafka::FFI.rd_kafka_destroy(self) end end |
#get_background_queue ⇒ Queue?
The returned Queue must not be polled, forwarded, or otherwise manage by the application. It may only be used as the destination queue passed to queue-enabled APIs.
The caller must call #destroy on the Queue when finished with it
Get a reference to the background thread queue. The background queue is automatically polled by librdkafka and is fully managed internally.
274 275 276 |
# File 'lib/kafka/ffi/client.rb', line 274 def get_background_queue ::Kafka::FFI.rd_kafka_queue_get_background(self) end |
#get_main_queue ⇒ Queue
Application must call #destroy on this queue when finished.
Get a reference to the main librdkafka event queue. This is the queue that is served by rd_kafka_poll.
259 260 261 |
# File 'lib/kafka/ffi/client.rb', line 259 def get_main_queue ::Kafka::FFI.rd_kafka_queue_get_main(self) end |
#group_list(group: nil, timeout: 1000) ⇒ Kafka::FFI::GroupList
Application must call #destroy to release the list when done
List and describe client groups in the cluster.
376 377 378 379 380 381 382 383 384 385 386 387 |
# File 'lib/kafka/ffi/client.rb', line 376 def group_list(group: nil, timeout: 1000) list = ::FFI::MemoryPointer.new(:pointer) err = ::Kafka::FFI.rd_kafka_list_groups(self, group, list, timeout) if err != :ok raise ::Kafka::ResponseError, err end GroupList.new(list.read_pointer) ensure list.free end |
#metadata(local_only: false, topic: nil, timeout: 1000) ⇒ Metadata
Retrieve metadata from the Kafka cluster
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 |
# File 'lib/kafka/ffi/client.rb', line 350 def (local_only: false, topic: nil, timeout: 1000) ptr = ::FFI::MemoryPointer.new(:pointer) # Need to use a Topic reference if asking for only information about a # single topic. if topic.is_a?(String) topic = self.topic(topic) end err = ::Kafka::FFI.(self, local_only, topic, ptr, timeout) if err != :ok raise ::Kafka::ResponseError, err end Kafka::FFI::Metadata.new(ptr.read_pointer) ensure ptr.free end |
#name ⇒ String
Retrieve the Kafka handle name.
123 124 125 |
# File 'lib/kafka/ffi/client.rb', line 123 def name ::Kafka::FFI.rd_kafka_name(self) end |
#offsets_for_times(list, timeout: 1000) ⇒ TopicPartitionList
Look up the offsets for the given partition by timestamp. The offset for each partition will be the earliest offset whose timestamp is greater than or equal to the timestamp set in the TopicPartitionList.
327 328 329 330 331 332 333 334 335 336 337 338 |
# File 'lib/kafka/ffi/client.rb', line 327 def offsets_for_times(list, timeout: 1000) if list.nil? raise ArgumentError, "list cannot be nil" end err = ::Kafka::FFI.rd_kafka_offsets_for_times(self, list, timeout) if err != :ok raise ::Kafka::ResponseError, err end list end |
#outq_len ⇒ Integer Also known as: out_queue_len
Returns the current length of the outbound queue. This is the sum of several factors including outbound messages, pending callbacks, waiting acknowledgements, etc…
An application should wait for the return value to reach 0 before terminating to make sure outstanding messages, requests, callbacks, and events are fully processed.
406 407 408 |
# File 'lib/kafka/ffi/client.rb', line 406 def outq_len ::Kafka::FFI.rd_kafka_outq_len(self) end |
#pause_partitions(list) ⇒ TopicPartitionList
Pause producing or consuming of the provided list of partitions. The list is updated to include any errors.
225 226 227 228 229 230 231 232 |
# File 'lib/kafka/ffi/client.rb', line 225 def pause_partitions(list) err = ::Kafka::FFI.rd_kafka_pause_partitions(self, list) if err != :ok raise ::Kafka::ResponseError, err end list end |
#poll(timeout: 250) ⇒ Integer
Do not call in a Consumer after poll_set_consumer has been called.
Polls for events on the the Client, causing callbacks to be fired. This is used by both the Producer and Consumer to ensure callbacks are processed in a timely manor.
-1 - Wait indefinately for an event.
212 213 214 |
# File 'lib/kafka/ffi/client.rb', line 212 def poll(timeout: 250) ::Kafka::FFI.rd_kafka_poll(self, timeout) end |
#query_watermark_offsets(topic, partition, timeout: 1000) ⇒ Range
Query the broker for the oldest and newest offsets for the partition.
304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/kafka/ffi/client.rb', line 304 def query_watermark_offsets(topic, partition, timeout: 1000) low = ::FFI::MemoryPointer.new(:int64) high = ::FFI::MemoryPointer.new(:int64) err = ::Kafka::FFI.rd_kafka_query_watermark_offsets(self, topic, partition, low, high, timeout) if err != :ok raise ::Kafka::ResponseError, err end Range.new(low.read_int64, high.read_int64, false) end |
#resume_partitions(list) ⇒ TopicPartitionList
Resume producing or consuming of the provided list of partitions.
242 243 244 245 246 247 248 249 |
# File 'lib/kafka/ffi/client.rb', line 242 def resume_partitions(list) err = ::Kafka::FFI.rd_kafka_resume_partitions(self, list) if err != :ok raise ::Kafka::ResponseError, err end list end |
#set_log_queue(dest) ⇒ Object
Forward librdkafka and debug logs to the specified queue. This allows the application to serve logg callbacks in its thread of choice.
285 286 287 288 289 290 291 292 |
# File 'lib/kafka/ffi/client.rb', line 285 def set_log_queue(dest) err = ::Kafka::FFI.rd_kafka_set_log_queue(self, dest) if err != :ok raise ::Kafka::ResponseError, err end nil end |
#topic(name, config = nil) ⇒ Topic
The returned Topic is owned by the Client and will be destroyed when the Client is destroyed.
Create or fetch the Topic with the given name. The first time topic is called for a given name, a configuration can be passed for the topic.
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/kafka/ffi/client.rb', line 179 def topic(name, config = nil) topic = @topics[name] if topic if config # Make this an exception because it's probably a programmer error # that _should_ primarily happen during development due to # misunderstanding the semantics. raise ::Kafka::FFI::TopicAlreadyConfiguredError, "#{name} was already configured" end return topic end topic = ::Kafka::FFI.rd_kafka_topic_new(self, name, config) if topic.nil? raise ::Kafka::ResponseError, ::Kafka::FFI.rd_kafka_last_error end @topics[name] = topic topic end |