Class: Kafka::FFI::Client

Inherits:
OpaquePointer show all
Defined in:
lib/kafka/ffi/client.rb

Overview

Note:

Naming this is hard and librdkafka primarily just refers to it as “a handle” to an instance. It’s more akin to an internal service and this Client talks the API of that service.

Client is a handle to a configured librdkafka instance that begins operation once created. Client is an abstract class and will provide either a Consumer or Producer based on the type being created. Each Client instance can either produce or consume messages to / from topics and cannot do both.

See Also:

Direct Known Subclasses

Consumer, Producer

Instance Attribute Summary

Attributes inherited from OpaquePointer

#pointer

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from OpaquePointer

by_ref, inherited, to_native

Constructor Details

#initialize(ptr) ⇒ Client

Returns a new instance of Client.



101
102
103
104
105
106
107
108
# File 'lib/kafka/ffi/client.rb', line 101

def initialize(ptr)
  super(ptr)

  # Caches Topics created on the first call to #topic below. Topics need to
  # be destroyed before destroying the Client. We keep a set in the client
  # so end users don't need to think about it.
  @topics = {}
end

Class Method Details

.from_native(ptr, _ctx) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/kafka/ffi/client.rb', line 69

def self.from_native(ptr, _ctx)
  if !ptr.is_a?(::FFI::Pointer)
    raise TypeError, "from_native can only convert from a ::FFI::Pointer to #{self}"
  end

  # Converting from a null pointer should return nil. Likely this was
  # caused by rd_kafka_new returning an error and a NULL pointer for the
  # Client.
  if ptr.null?
    return nil
  end

  # Build a temporary Client to pass to rd_kafka_type. There is a bit of a
  # chicken and egg problem here. We can't create the final class until
  # after we know the type. But for type safety we want to pass a Client.
  cfg = allocate
  cfg.send(:initialize, ptr)
  type = ::Kafka::FFI.rd_kafka_type(cfg)

  klass =
    case type
    when :producer then Producer
    when :consumer then Consumer
    else
      raise ArgumentError, "unknown Kafka client type: #{type}"
    end

  client = klass.allocate
  client.send(:initialize, ptr)
  client
end

.new(type, config = nil) ⇒ Consumer, Producer

Create a new Client of type with the given configuration.

Parameters:

  • type (:consumer, :producer)

    Type of Kafka instance to create.

  • config (nil) (defaults to: nil)

    Use librdkafka default config

  • config (Config, Kafka::Config) (defaults to: nil)

    Configuration for the instance.

  • config (Hash{[String, Symbol] => [String, Integer, nil, Boolean]}) (defaults to: nil)

    Configuration options for the instance.

Returns:



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/kafka/ffi/client.rb', line 34

def self.new(type, config = nil)
  error = ::FFI::MemoryPointer.new(:char, 512)

  # Convenience for passing in a Kafka::Config instead of building a
  # Kafka::FFI::Config since Kafka::Config provides a way to create a
  # config from a Hash.
  config =
    case config
    when Config, nil     then config
    when ::Kafka::Config then config.to_ffi
    when Hash            then ::Kafka::Config.new(config).to_ffi
    else
      raise ArgumentError, "config must be on of nil, Config, ::Kafka::Config, or Hash"
    end

  client = Kafka::FFI.rd_kafka_new(type, config, error, error.size)
  if client.nil?
    raise Error, error.read_string
  end

  if config
    # Store a reference to the config on the Client instance. We do this to
    # tie the Config's lifecycle to the Client instance in Ruby since they
    # are already tied in librdkafka. This ensures that any Ruby objects
    # referenced in the config (like callbacks) are not garbage collected.
    #
    # Using instance_variable_set to avoid exposing an API method that
    # could cause confusion from end users since the config cannot be
    # changed after initialization.
    client.instance_variable_set(:@config, config)
  end

  client
end

Instance Method Details

#alter_configs(resources, options: nil, timeout: 5000) ⇒ nil, Array<Admin::ConfigResource>

Note:

AlterConfigs will replace all existing configuration for the given resources, reverting all unspecified config options to their default values.

Note:

At most one :broker type ConfigResource can be specified per call to alter_configs since the changes must be sent to the broker specified in the resource.

Update the configuration for the specified resources. Updates may succeed for a subset of the provided resources while others fail. The configuration for a particular resource is update atomically, replacing values using the provided ConfigResource (set via set_config) and reverting any unspecified config options to their default values.

Application is responsible for calling #destroy on the returned results when done with the results.

Parameters:

  • resources (Admin::ConfigResource)

    Resource to alter configs for.

  • resources (Array<Admin::ConfigResource>)

    List of resources with their configs to update. At most one of type :broker is allowed per call.

  • options (Admin::AdminOptions) (defaults to: nil)

    Admin API request options

  • timeout (Integer) (defaults to: 5000)

    Time to wait in milliseconds for request to complete.

Returns:

See Also:

  • rd_kafka_AlterConfigs


573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
# File 'lib/kafka/ffi/client.rb', line 573

def alter_configs(resources, options: nil, timeout: 5000)
  resources = Array(resources)

  # NewPartitions wants an array of Admin::ConfigResource
  list = ::FFI::MemoryPointer.new(:pointer, resources.length)
  list.write_array_of_pointer(resources.map(&:pointer))

  # Queue to receive the result
  queue = ::Kafka::FFI::Queue.new(self)

  ::Kafka::FFI.rd_kafka_AlterConfigs(self, list, resources.length, options, queue)

  event = queue.poll(timeout: timeout)
  if event
    ::Kafka::FFI::Admin::Result.new(event)
  end
ensure
  list.free
  queue.destroy
end

#brokers_add(brokers) ⇒ Integer

Note:

It is preferred to set brokers through the ‘metadata.broker.list` or `bootstrap.servers` config options.

Adds one or more brokers to the Client’s list of initial bootstrap brokers. Additionaly brokers will be discovered automatically once the Client connects to a broker by querying the broker metadata.

Examples:

Add multiple brokers

client.brokers_add(["kafka_1:9092", "kafka_2:9092"])

Add a single broker with protocol

client.brokers.add("PLAINTEXT://localhost:9096")

Parameters:

  • brokers (String)

    Comma separated list of broker addresses to add.

  • brokers (Array<String>)

    Array of broker addresses to add.

Returns:

  • (Integer)

    number of brokers successfully added

See Also:

  • rd_kafka_brokers_add


430
431
432
433
434
435
436
# File 'lib/kafka/ffi/client.rb', line 430

def brokers_add(brokers)
  if brokers.is_a?(Array)
    brokers = brokers.join(",")
  end

  ::Kafka::FFI.rd_kafka_brokers_add(self, brokers)
end

#cluster_id(timeout: 1000) ⇒ nil, String

Note:

requires config ‘api.version.request` set to true

Retrieves the Client’s Cluster ID

non-bloack call that will return immediately if metadata is cached.

Parameters:

  • timeout (Integer) (defaults to: 1000)

    Maximum time to wait in milliseconds. Use 0 for

Returns:

  • (nil)

    Cluster ID not available

  • (String)

    ID of the Cluster



136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/kafka/ffi/client.rb', line 136

def cluster_id(timeout: 1000)
  ptr = ::Kafka::FFI.rd_kafka_clusterid(self, timeout)
  if ptr.null?
    return nil
  end

  begin
    ptr.read_string
  ensure
    # Documentation explicitly says that the string needs to be freed.
    ::Kafka::FFI.rd_kafka_mem_free(self, ptr)
  end
end

#configConfig

Note:

The returned config is read-only and tied to the lifetime of the Client. Don’t try to modify or destroy the config.

Retrive the current configuration used by Client.

Returns:

  • (Config)

    Client’s current config. Read-only.



116
117
118
# File 'lib/kafka/ffi/client.rb', line 116

def config
  ::Kafka::FFI.rd_kafka_conf(self)
end

#controller_id(timeout: 1000) ⇒ Integer

Note:

requires config ‘api.version.request` set to true

Retrieves the current Controller ID as reported by broker metadata.

Parameters:

  • timeout (Integer) (defaults to: 1000)

    Maximum time to wait in milliseconds. Specify 0 for a non-blocking call.

Returns:

  • (Integer)

    controller broker id or -1 if no ID could be retrieved before the timeout.



159
160
161
# File 'lib/kafka/ffi/client.rb', line 159

def controller_id(timeout: 1000)
  ::Kafka::FFI.rd_kafka_controllerid(self, timeout)
end

#create_partitions(requests, options: nil, timeout: 5000) ⇒ nil, Admin::Result<Admin::TopicResult>

Create additional partition(s) for a topic on the cluster.

Application is responsible for calling #destroy on the returned results when done with the results.

and possibly broker assignments for those partitions.

Parameters:

  • requests (Admin::NewPartitions)

    Details about partions to create

  • requests (Array<Admin::NewPartitions>)

    List of partition detauls.

  • options (Admin::AdminOptions) (defaults to: nil)

    Admin API request options

  • timeout (Integer) (defaults to: 5000)

    Time to wait in milliseconds for request to complete.

Returns:



523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
# File 'lib/kafka/ffi/client.rb', line 523

def create_partitions(requests, options: nil, timeout: 5000)
  requests = Array(requests)

  # NewPartitions wants an array of Admin::NewPartitions
  list = ::FFI::MemoryPointer.new(:pointer, requests.length)
  list.write_array_of_pointer(requests.map(&:pointer))

  # Queue to receive the result
  queue = ::Kafka::FFI::Queue.new(self)

  ::Kafka::FFI.rd_kafka_CreatePartitions(self, list, requests.length, options, queue)

  event = queue.poll(timeout: timeout)
  if event
    ::Kafka::FFI::Admin::Result.new(event)
  end
ensure
  list.free
  queue.destroy
end

#create_topics(topics, options: nil, timeout: 5000) ⇒ nil, Admin::Result<TopicResult>

Create topics in the cluster with the given configuration.

Application is responsible for calling #destroy on the returned results when done with the results.

Parameters:

  • topics (NewTopic, Array<NewTopic>)

    List of topics to create on the cluster.

  • timeout (Integer) (defaults to: 5000)

    Time in milliseconds to wait for a reply.

Returns:

  • (nil)

    Create timed out

  • (Admin::Result<TopicResult>)

    Response from the cluster with details about the creation of the list of topics or any errors.

Raises:



455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
# File 'lib/kafka/ffi/client.rb', line 455

def create_topics(topics, options: nil, timeout: 5000)
  topics = Array(topics)

  # CreateTopic wants an array of topics
  list = ::FFI::MemoryPointer.new(:pointer, topics.length)
  list.write_array_of_pointer(topics.map(&:pointer))

  queue = ::Kafka::FFI::Queue.new(self)

  ::Kafka::FFI.rd_kafka_CreateTopics(self, list, topics.length, options, queue)

  event = queue.poll(timeout: timeout)
  if event
    ::Kafka::FFI::Admin::Result.new(event)
  end
ensure
  list.free
  queue.destroy
end

#default_topic_conf_dupTopicConfig

Create a copy of the Client’s default topic configuration object. The caller is now responsible for ownership of the new config.

Returns:



393
394
395
# File 'lib/kafka/ffi/client.rb', line 393

def default_topic_conf_dup
  ::Kafka::FFI.rd_kafka_default_topic_conf_dup(self)
end

#delete_topics(topics, options: nil, timeout: 5000) ⇒ nil, Array<TopicResult>

Delete a list of Topics from the cluster.

Application is responsible for calling #destroy on the returned results when done with the results.

Parameters:

  • topics (DeleteTopic)

    List of topics to delete

  • timeout (Integer) (defaults to: 5000)

    Time to wait in milliseconds for the deletion to complete.

Returns:

  • (nil)

    Delete timed out

  • (Array<TopicResult>)

    Response from the cluster with details about the deletion of the list of topics or any errors.



488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
# File 'lib/kafka/ffi/client.rb', line 488

def delete_topics(topics, options: nil, timeout: 5000)
  topics = Array(topics)

  # DeleteTopics wants an array of topics
  list = ::FFI::MemoryPointer.new(:pointer, topics.length)
  list.write_array_of_pointer(topics.map(&:pointer))

  queue = ::Kafka::FFI::Queue.new(self)

  ::Kafka::FFI.rd_kafka_DeleteTopics(self, list, topics.length, options, queue)

  event = queue.poll(timeout: timeout)
  if event
    ::Kafka::FFI::Admin::Result.new(event)
  end
ensure
  list.free
  queue.destroy
end

#describe_configs(resources, options: nil, timeout: 5000) ⇒ nil, Admin::Result<Admin::ConfigResource>

Get configuration for the specified resources.

Application is responsible for calling #destroy on the returned results when done with the results.

Parameters:

  • resources (Admin::ConfigResource)

    Resource to request configuration details for.

  • resources (Array<Admin::ConfigResource>)

    List of resources to get config details for.

  • options (Admin::AdminOptions) (defaults to: nil)

    Admin API request options

  • timeout (Integer) (defaults to: 5000)

    Time to wait in milliseconds for request to complete.

Returns:

See Also:

  • rd_kafka_DescribeConfigs


612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
# File 'lib/kafka/ffi/client.rb', line 612

def describe_configs(resources, options: nil, timeout: 5000)
  resources = Array(resources)

  # DescribeConfigs wants an array of Admin::ConfigResource pointers
  list = ::FFI::MemoryPointer.new(:pointer, resources.length)
  list.write_array_of_pointer(resources.map(&:pointer))

  # Queue to receive the result
  queue = ::Kafka::FFI::Queue.new(self)

  ::Kafka::FFI.rd_kafka_DescribeConfigs(self, list, resources.length, options, queue)

  event = queue.poll(timeout: timeout)
  if event
    ::Kafka::FFI::Admin::Result.new(event)
  end
ensure
  list.free
  queue.destroy
end

#destroyObject

Release all of the resources used by this Client. This may block until the instance has finished it’s shutdown procedure. Always make sure to destory any associated resources and cleanly shutting down the instance before calling destroy.



637
638
639
640
641
642
643
644
645
646
647
# File 'lib/kafka/ffi/client.rb', line 637

def destroy
  if !pointer.null?
    # Clean up any cached topics before destroying the Client.
    @topics.each do |_, topic|
      ::Kafka::FFI.rd_kafka_topic_destroy(topic)
    end
    @topics.clear

    ::Kafka::FFI.rd_kafka_destroy(self)
  end
end

#get_background_queueQueue?

Note:

The returned Queue must not be polled, forwarded, or otherwise manage by the application. It may only be used as the destination queue passed to queue-enabled APIs.

Note:

The caller must call #destroy on the Queue when finished with it

Get a reference to the background thread queue. The background queue is automatically polled by librdkafka and is fully managed internally.

Returns:

  • (Queue)

    Background queue

  • (nil)

    Background queue is disabled



274
275
276
# File 'lib/kafka/ffi/client.rb', line 274

def get_background_queue
  ::Kafka::FFI.rd_kafka_queue_get_background(self)
end

#get_main_queueQueue

Note:

Application must call #destroy on this queue when finished.

Get a reference to the main librdkafka event queue. This is the queue that is served by rd_kafka_poll.

Returns:

  • (Queue)

    Main client Event queue



259
260
261
# File 'lib/kafka/ffi/client.rb', line 259

def get_main_queue
  ::Kafka::FFI.rd_kafka_queue_get_main(self)
end

#group_list(group: nil, timeout: 1000) ⇒ Kafka::FFI::GroupList

Note:

Application must call #destroy to release the list when done

List and describe client groups in the cluster.

Returns:

Raises:



376
377
378
379
380
381
382
383
384
385
386
387
# File 'lib/kafka/ffi/client.rb', line 376

def group_list(group: nil, timeout: 1000)
  list = ::FFI::MemoryPointer.new(:pointer)

  err = ::Kafka::FFI.rd_kafka_list_groups(self, group, list, timeout)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  GroupList.new(list.read_pointer)
ensure
  list.free
end

#metadata(local_only: false, topic: nil, timeout: 1000) ⇒ Metadata

Retrieve metadata from the Kafka cluster

Parameters:

  • local_only (Boolean) (defaults to: false)

    Only request info about locally known topics, don’t query all topics in the cluster.

  • topic (String, Topic) (defaults to: nil)

    Only request info about this topic.

  • timeout (Integer) (defaults to: 1000)

    Request timeout in milliseconds

Returns:

  • (Metadata)

    Details about the state of the cluster.

Raises:



350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# File 'lib/kafka/ffi/client.rb', line 350

def (local_only: false, topic: nil, timeout: 1000)
  ptr = ::FFI::MemoryPointer.new(:pointer)

  # Need to use a Topic reference if asking for only information about a
  # single topic.
  if topic.is_a?(String)
    topic = self.topic(topic)
  end

  err = ::Kafka::FFI.(self, local_only, topic, ptr, timeout)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  Kafka::FFI::Metadata.new(ptr.read_pointer)
ensure
  ptr.free
end

#nameString

Retrieve the Kafka handle name.

Returns:

  • (String)

    handle / client name



123
124
125
# File 'lib/kafka/ffi/client.rb', line 123

def name
  ::Kafka::FFI.rd_kafka_name(self)
end

#offsets_for_times(list, timeout: 1000) ⇒ TopicPartitionList

Look up the offsets for the given partition by timestamp. The offset for each partition will be the earliest offset whose timestamp is greater than or equal to the timestamp set in the TopicPartitionList.

Parameters:

  • list (TopicPartitionList)

    List of TopicPartitions to fetch offsets for. The TopicPartitions in the list will be modified based on the results of the query.

Returns:

Raises:



327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/kafka/ffi/client.rb', line 327

def offsets_for_times(list, timeout: 1000)
  if list.nil?
    raise ArgumentError, "list cannot be nil"
  end

  err = ::Kafka::FFI.rd_kafka_offsets_for_times(self, list, timeout)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  list
end

#outq_lenInteger Also known as: out_queue_len

Returns the current length of the outbound queue. This is the sum of several factors including outbound messages, pending callbacks, waiting acknowledgements, etc…

An application should wait for the return value to reach 0 before terminating to make sure outstanding messages, requests, callbacks, and events are fully processed.

Returns:

  • (Integer)

    Number of outbound items still pending



406
407
408
# File 'lib/kafka/ffi/client.rb', line 406

def outq_len
  ::Kafka::FFI.rd_kafka_outq_len(self)
end

#pause_partitions(list) ⇒ TopicPartitionList

Pause producing or consuming of the provided list of partitions. The list is updated to include any errors.

Parameters:

Returns:

  • (TopicPartitionList)

    List of partitions with errors set for any of the TopicPartitions that failed.

Raises:



225
226
227
228
229
230
231
232
# File 'lib/kafka/ffi/client.rb', line 225

def pause_partitions(list)
  err = ::Kafka::FFI.rd_kafka_pause_partitions(self, list)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  list
end

#poll(timeout: 250) ⇒ Integer

Note:

Do not call in a Consumer after poll_set_consumer has been called.

Polls for events on the the Client, causing callbacks to be fired. This is used by both the Producer and Consumer to ensure callbacks are processed in a timely manor.

-1 - Wait indefinately for an event.

Parameters:

  • timeout (Integer) (defaults to: 250)

    Time in milliseconds to wait for an event. 0 - Non-blocking call, returning immediately if there are no events.

Returns:

  • (Integer)

    Number of events served



212
213
214
# File 'lib/kafka/ffi/client.rb', line 212

def poll(timeout: 250)
  ::Kafka::FFI.rd_kafka_poll(self, timeout)
end

#query_watermark_offsets(topic, partition, timeout: 1000) ⇒ Range

Query the broker for the oldest and newest offsets for the partition.

Parameters:

  • topic (String)

    Name of the topic to get offsets for

  • partition (int)

    Partition of the topic to get offsets for

Returns:

  • (Range)

    Range of known offsets

Raises:



304
305
306
307
308
309
310
311
312
313
314
# File 'lib/kafka/ffi/client.rb', line 304

def query_watermark_offsets(topic, partition, timeout: 1000)
  low  = ::FFI::MemoryPointer.new(:int64)
  high = ::FFI::MemoryPointer.new(:int64)

  err = ::Kafka::FFI.rd_kafka_query_watermark_offsets(self, topic, partition, low, high, timeout)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  Range.new(low.read_int64, high.read_int64, false)
end

#resume_partitions(list) ⇒ TopicPartitionList

Resume producing or consuming of the provided list of partitions.

Parameters:

Returns:

  • (TopicPartitionList)

    List of partitions with errors set for any of the TopicPartitions that failed.

Raises:



242
243
244
245
246
247
248
249
# File 'lib/kafka/ffi/client.rb', line 242

def resume_partitions(list)
  err = ::Kafka::FFI.rd_kafka_resume_partitions(self, list)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  list
end

#set_log_queue(dest) ⇒ Object

Forward librdkafka and debug logs to the specified queue. This allows the application to serve logg callbacks in its thread of choice.

Parameters:

  • dest (Queue)

    Destination Queue for logs

  • dest (nil)

    Forward logs to the Client’s main queue

Raises:



285
286
287
288
289
290
291
292
# File 'lib/kafka/ffi/client.rb', line 285

def set_log_queue(dest)
  err = ::Kafka::FFI.rd_kafka_set_log_queue(self, dest)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  nil
end

#topic(name, config = nil) ⇒ Topic

Note:

The returned Topic is owned by the Client and will be destroyed when the Client is destroyed.

Create or fetch the Topic with the given name. The first time topic is called for a given name, a configuration can be passed for the topic.

Parameters:

  • name (String)

    Name of the topic

  • config (TopicConfig, nil) (defaults to: nil)

    Config options for the topic. This can only be passed for the first call of ‘topic` per topic name since a Topic can only be configured at creation.

Returns:

  • (Topic)

    Topic instance

Raises:



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/kafka/ffi/client.rb', line 179

def topic(name, config = nil)
  topic = @topics[name]
  if topic
    if config
      # Make this an exception because it's probably a programmer error
      # that _should_ primarily happen during development due to
      # misunderstanding the semantics.
      raise ::Kafka::FFI::TopicAlreadyConfiguredError, "#{name} was already configured"
    end

    return topic
  end

  topic = ::Kafka::FFI.rd_kafka_topic_new(self, name, config)
  if topic.nil?
    raise ::Kafka::ResponseError, ::Kafka::FFI.rd_kafka_last_error
  end

  @topics[name] = topic
  topic
end