Class: Kafka::FFI::Consumer

Inherits:
Client show all
Defined in:
lib/kafka/ffi/consumer.rb

Instance Attribute Summary

Attributes inherited from OpaquePointer

#pointer

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Client

#alter_configs, #brokers_add, #cluster_id, #config, #controller_id, #create_partitions, #create_topics, #default_topic_conf_dup, #delete_topics, #describe_configs, #destroy, from_native, #get_background_queue, #get_main_queue, #group_list, #initialize, #metadata, #name, #offsets_for_times, #outq_len, #pause_partitions, #poll, #query_watermark_offsets, #resume_partitions, #set_log_queue, #topic

Methods inherited from OpaquePointer

by_ref, from_native, inherited, #initialize, to_native

Constructor Details

This class inherits a constructor from Kafka::FFI::Client

Class Method Details

.new(config = nil) ⇒ Object



10
11
12
# File 'lib/kafka/ffi/consumer.rb', line 10

def self.new(config = nil)
  super(:consumer, config)
end

Instance Method Details

#assign(list) ⇒ Object

Atomically assign the set of partitions to consume. This will replace the existing assignment.

Parameters:

Raises:

See Also:

  • rd_kafka_assign for semantics on use from callbacks and how empty vs NULL lists affect internal state.


171
172
173
174
175
176
177
178
# File 'lib/kafka/ffi/consumer.rb', line 171

def assign(list)
  err = ::Kafka::FFI.rd_kafka_assign(self, list)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  nil
end

#assignmentHash{String => Array<Integer>} Also known as: assignments

List the current partition assignment(s) for the consumer.

Returns:

  • (Hash{String => Array<Integer>})

    Current assignments for the consumer. Hash keys are topic names and values are the list of assigned partitions.

Raises:



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/kafka/ffi/consumer.rb', line 188

def assignment
  ptr = ::FFI::MemoryPointer.new(:pointer)

  err = ::Kafka::FFI.rd_kafka_assignment(self, ptr)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  begin
    tpl = ::Kafka::FFI::TopicPartitionList.new(ptr.read_pointer)

    # { "topic" => [1, 2, 3] }
    tpl.elements.inject({}) do |memo, tp|
      (memo[tp.topic] ||= []) << tp.partition
      memo
    end
  ensure
    tpl.destroy
  end
ensure
  ptr.free
end

#closeObject

Note:

Maximum blocking time is roughly limited to ‘session.timeout.ms`

Close down the consumer. This will block until the consumer has revoked its assignment(s), committed offsets, and left the consumer group. The maximum blocking time is roughly limited to the ‘session.timeout.ms` config option.

Ensure that ‘destroy` is called after the consumer is closed to free up resources.

Raises:



333
334
335
336
337
338
339
340
# File 'lib/kafka/ffi/consumer.rb', line 333

def close
  err = ::Kafka::FFI.rd_kafka_consumer_close(self)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  nil
end

#commit(offsets, async) ⇒ Object

Commit the set of offsets from the given TopicPartitionList.

Parameters:

  • offsets (TopicPartitionList)

    Set of topic+partition with offset (and maybe metadata) to be committed. If offsets is nil the current partition assignment set will be used instead.

  • async (Boolean)

    If async is false this operation will block until the broker offset commit is done.

Raises:



294
295
296
297
298
299
300
301
# File 'lib/kafka/ffi/consumer.rb', line 294

def commit(offsets, async)
  err = ::Kafka::FFI.rd_kafka_commit(self, offsets, async)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  nil
end

#commit_message(message, async) ⇒ Object

Commit the message’s offset on the broker for the message’s partition.

Parameters:

  • message (Message)

    The message to commit as processed

  • async (Boolean)

    True to allow commit to happen in the background.

Raises:



309
310
311
312
313
314
315
316
317
318
319
320
# File 'lib/kafka/ffi/consumer.rb', line 309

def commit_message(message, async)
  if message.nil? || message.null?
    raise ArgumentError, "message cannot but nil/null"
  end

  err = ::Kafka::FFI.rd_kafka_commit_message(message, async)
  if err
    raise ::Kafka::ResponseError, err
  end

  nil
end

#committed(list, timeout: 1000) ⇒ TopicPartitionList

Retrieve committed offsets for topics + partitions. The offset field for each TopicPartition in list will be set to the stored offset or RD_KAFKA_OFFSET_INVALID in case there was no stored offset for that partition. The error field is set if there was an error with the TopicPartition.

Parameters:

  • list (TopicPartitionList)

    List of topic+partitions to fetch current offsets. The list will be updated to set the committed offset or error as appropriate.

  • timeout (Integer) (defaults to: 1000)

    Maximum time to wait in milliseconds

Returns:

Raises:

  • (Kafka::ResponseError)

    Error with the request (likely a timeout). Errors with individual topic+partition combinations are set in the returned TopicPartitionList



230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/kafka/ffi/consumer.rb', line 230

def committed(list, timeout: 1000)
  if list.nil?
    raise ArgumentError, "list cannot be nil"
  end

  err = ::Kafka::FFI.rd_kafka_committed(self, list, timeout)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  # Return the list that was passed in as it should now be augmented with
  # the committed offsets and any errors fetching said offsets.
  list
end

#consumer_poll(timeout) {|message| ... } ⇒ Object

Poll the consumer’s queue for a waiting Message and yields that message. The yielded message must not be cached in the application as it becomes unusable once the block completes.

Parameters:

  • timeout (Integer)

    How long to wait for a message in milliseconds.

Yields:

  • (message)

Yield Parameters:

  • message (Message)

    Message received from Kafka. Application must not call #destroy as it is owned by the Consumer.

Returns:

  • Either nil or the result of the block

Raises:

  • (ArgumentError)

    consumer_poll was called without a block.

  • (Kafka::ResponseError)

    Error occurred while polling.

See Also:

  • configuration option.


261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/kafka/ffi/consumer.rb', line 261

def consumer_poll(timeout)
  if !block_given?
    raise ArgumentError, "consumer_poll must be passed a block"
  end

  msg = ::Kafka::FFI.rd_kafka_consumer_poll(self, timeout.to_i)

  # No message was available
  if msg.null?
    return nil
  end

  begin
    if msg.error
      raise msg.error
    end

    yield(msg)
  ensure
    msg.destroy
  end
end

#get_consumer_queueQueue

Note:

Caller must call #destroy when done with the Queue.

Returns a reference to the consume queue. This is the queue served by consumer_poll.

Returns:

  • (Queue)

    Consumer queue



61
62
63
# File 'lib/kafka/ffi/consumer.rb', line 61

def get_consumer_queue
  ::Kafka::FFI.rd_kafka_queue_get_consumer(self)
end

#get_partition_queue(topic, partition) ⇒ Queue

Note:

Caller must call #destroy when done with the Queue.

Returns a reference to the partition’s queue.

Returns:

  • (Queue)

    Partition Queue



71
72
73
# File 'lib/kafka/ffi/consumer.rb', line 71

def get_partition_queue(topic, partition)
  ::Kafka::FFI.rd_kafka_queue_get_partition(self, topic, partition)
end

#get_watermark_offsets(topic, partition) ⇒ Array<(Integer, Integer)>

Get the last known (cached) low and high offsets for the partition. This differs from query_watermark_offsets in that does not query the brokers.

Parameters:

  • topic (String)

    Name of the topic

  • partition (Integer)

    Topic partition

Returns:

  • (Array<(Integer, Integer)>)

    low and high offsets. If either is unknown the RD_KAFKA_OFFSET_INVALID is returned for that value

Raises:

See Also:



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/kafka/ffi/consumer.rb', line 41

def get_watermark_offsets(topic, partition)
  low  = ::FFI::MemoryPointer.new(:int64)
  high = ::FFI::MemoryPointer.new(:int64)

  err = ::Kafka::FFI.rd_kafka_get_watermark_offsets(self, topic, partition, low, high)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  [low.read_int64, high.read_int64]
end

#member_idString

Retrieve the Consumer’s broker assigned group Member ID.

Returns:

  • (String)

    broker assigned group Member ID



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/kafka/ffi/consumer.rb', line 17

def member_id
  id = ::Kafka::FFI.rd_kafka_memberid(self)

  if id.null?
    return nil
  end

  id.read_string
ensure
  ::Kafka::FFI.rd_kafka_mem_free(self, id)
end

#poll_set_consumerObject

Note:

It is not permitted to call #poll after redirecting the main queue with poll_set_consumer.

Redirect the main event queue to the Consumer’s queue so the consumer doesn’t need to poll from it separately for event callbacks to fire.

Raises:



83
84
85
86
87
88
89
90
# File 'lib/kafka/ffi/consumer.rb', line 83

def poll_set_consumer
  err = ::Kafka::FFI.rd_kafka_poll_set_consumer(self)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  nil
end

#subscribe(topic, *rest) ⇒ Object

Subscribe the consumer to receive Messages for a set of topics. The current set of subscriptions will be replaced.

Examples:

Subscribe to multiple topics

client.subscribe("signals", "events", "changes")

Parameters:

  • Topic (String, Array<String>)

    name or list of topics to subscribe to.

Raises:



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/kafka/ffi/consumer.rb', line 102

def subscribe(topic, *rest)
  topics = [topic, rest].flatten

  tpl = TopicPartitionList.new(topics.length)
  topics.each do |t|
    tpl.add(t)
  end

  err = ::Kafka::FFI.rd_kafka_subscribe(self, tpl)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  nil
ensure
  tpl.destroy
end

#subscriptionArray<String> Also known as: subscriptions

List the current topic subscriptions for the consumer.

Returns:

  • (Array<String>)

    List of current subscribed topics

Raises:



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/kafka/ffi/consumer.rb', line 139

def subscription
  ptr = ::FFI::MemoryPointer.new(:pointer)

  err = ::Kafka::FFI.rd_kafka_subscription(self, ptr)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  begin
    tpl = ::Kafka::FFI::TopicPartitionList.new(ptr.read_pointer)

    # Map the topic partition list to topic names.
    tpl.elements.map(&:topic)
  ensure
    tpl.destroy
  end
ensure
  ptr.free
end

#unsubscribeObject

Unsubscribe from the current subscription set (e.g. all current subscriptions).

Raises:



124
125
126
127
128
129
130
131
# File 'lib/kafka/ffi/consumer.rb', line 124

def unsubscribe
  err = ::Kafka::FFI.rd_kafka_unsubscribe(self)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  nil
end