Class: Kafka::FFI::TopicPartitionList

Inherits:
FFI::Struct
  • Object
show all
Defined in:
lib/kafka/ffi/topic_partition_list.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.new(count = 0) ⇒ TopicPartitionList

New initializes a new TopicPartitionList with an initial capacity to hold ‘count` items.

Parameters:

  • count (Integer) (defaults to: 0)

    Initial capacity

Returns:



20
21
22
23
24
25
26
27
28
# File 'lib/kafka/ffi/topic_partition_list.rb', line 20

def self.new(count = 0)
  # Handle initialization through FFI. This will be called by
  # rd_kafka_topic_partition_list_new.
  if count.is_a?(::FFI::Pointer)
    return super(count)
  end

  ::Kafka::FFI.rd_kafka_topic_partition_list_new(count)
end

Instance Method Details

#add(topic, partition = -1)) ⇒ TopicPartition

Add a topic + partition combination to the list

Parameters:

  • topic (String)

    Name of the topic to add

  • partition (Integer) (defaults to: -1))

    Partition of the topic to add to the list.

  • partition (-1) (defaults to: -1))

    Add all partitions of the topic to the list.

Returns:



51
52
53
# File 'lib/kafka/ffi/topic_partition_list.rb', line 51

def add(topic, partition = -1)
  ::Kafka::FFI.rd_kafka_topic_partition_list_add(self, topic.to_s, partition)
end

#add_range(topic, range_or_lower, upper = nil) ⇒ Object

Add a range of TopicPartitions to the list.

Parameters:

  • topic (String)

    Name of the topic to add

  • range_or_lower (Range, Integer)

    Either a Range specifying the Range of partitions or the lower bound for the range. When providing a Range any value for upper is ignored.

  • upper (Integer, nil) (defaults to: nil)

    The upper bound of the set of partitions (inclusive). Required unless range_or_lower is a Range.



63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/kafka/ffi/topic_partition_list.rb', line 63

def add_range(topic, range_or_lower, upper = nil)
  lower = range_or_lower

  # Allows passing a Range for convenience.
  if range_or_lower.is_a?(Range)
    lower = range_or_lower.min
    upper = range_or_lower.max
  elsif upper.nil?
    raise ArgumentError, "upper was nil but must be provided when lower is not a Range"
  end

  ::Kafka::FFI.rd_kafka_topic_partition_list_add_range(self, topic.to_s, lower.to_i, upper.to_i)
end

#copyTopicPartitionList

Duplicate the TopicPartitionList as a new TopicPartitionList that is identical to the current one.

Returns:



103
104
105
# File 'lib/kafka/ffi/topic_partition_list.rb', line 103

def copy
  ::Kafka::FFI.rd_kafka_topic_partition_list_copy(self)
end

#del(topic, partition) ⇒ Boolean Also known as: delete

Remove a TopicPartition by partition

Parameters:

  • topic (String)

    Name of the topic to remove

  • partition (Integer)

    Partition to remove

Returns:

  • (Boolean)

    True when the partition was found and removed



83
84
85
# File 'lib/kafka/ffi/topic_partition_list.rb', line 83

def del(topic, partition)
  ::Kafka::FFI.rd_kafka_topic_partition_list_del(self, topic.to_s, partition) == 1
end

#del_by_idx(idx) ⇒ Boolean Also known as: delete_by_index

Remove a TopicPartition by index

Parameters:

  • idx (Integer)

    Index in elements to remove

Returns:

  • (Boolean)

    True when the TopicPartition was found and removed



92
93
94
# File 'lib/kafka/ffi/topic_partition_list.rb', line 92

def del_by_idx(idx)
  ::Kafka::FFI.rd_kafka_topic_partition_list_del_by_idx(self, idx) == 1
end

#destroyObject

Free all resources used by the list and the list itself. Usage it dependent on the semantics of librdkafka, so make sure to only call on TopicPartitionLists that are not owned by objects. Generally, if you constructed the object it should be safe to destroy.



161
162
163
164
165
# File 'lib/kafka/ffi/topic_partition_list.rb', line 161

def destroy
  if !null?
    ::Kafka::FFI.rd_kafka_topic_partition_list_destroy(self)
  end
end

#elementsArray<TopicPartition>

Retrieves the set of TopicPartitions for the list.

Returns:



151
152
153
154
155
# File 'lib/kafka/ffi/topic_partition_list.rb', line 151

def elements
  self[:cnt].times.map do |i|
    TopicPartition.new(self[:elems] + (i * TopicPartition.size))
  end
end

#empty?Boolean

Returns true when the TopicPartitionList is empty

Returns:

  • (Boolean)

    True when the list is empty



40
41
42
# File 'lib/kafka/ffi/topic_partition_list.rb', line 40

def empty?
  size == 0
end

#find(topic, partition) ⇒ TopicPartition?

Find the TopicPartition in the set for the given topic + partition. Will return nil if the list does not include the combination.

Parameters:

  • topic (String)

    Name of the topic

  • partition (Integer)

    Topic partition

Returns:

  • (TopicPartition, nil)

    The TopicPartion for the topic + partition combination.



138
139
140
141
142
143
144
145
146
# File 'lib/kafka/ffi/topic_partition_list.rb', line 138

def find(topic, partition)
  result = ::Kafka::FFI.rd_kafka_topic_partition_list_find(self, topic, partition)

  if result.null?
    return nil
  end

  result
end

#set_offset(topic, partition, offset) ⇒ Integer

Set the consumed offset for topic and partition

Parameters:

  • topic (String)

    Name of the topic to set the offset for

  • partition (Integer)

    Partition to set the offset for

  • offset (Integer)

    Offset of the topic+partition to set

Returns:

  • (Integer)

    0 for success otherwise rd_kafka_resp_err_t code



114
115
116
# File 'lib/kafka/ffi/topic_partition_list.rb', line 114

def set_offset(topic, partition, offset)
  ::Kafka::FFI.rd_kafka_topic_partition_list_set_offset(self, topic, partition, offset)
end

#sizeInteger

Returns the number of elements in the TopicPartitionList.

Returns:

  • (Integer)

    Number of elements



33
34
35
# File 'lib/kafka/ffi/topic_partition_list.rb', line 33

def size
  self[:cnt]
end

#sort(&block) ⇒ Object

Sort the TopicPartitionList. Sort can take a block that should implement a standard comparison function that returns -1, 0, or 1 depending on if left is less than, equal to, or greater than the right argument.

Examples:

Custom sorting function

sort do |left, right|
  left.partition <=> right.partition
end


126
127
128
# File 'lib/kafka/ffi/topic_partition_list.rb', line 126

def sort(&block)
  ::Kafka::FFI.rd_kafka_topic_partition_list_sort(self, block, nil)
end