Class: Rdkafka::Consumer

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/rdkafka/consumer.rb,
lib/rdkafka/consumer/headers.rb,
lib/rdkafka/consumer/message.rb,
lib/rdkafka/consumer/partition.rb,
lib/rdkafka/consumer/topic_partition_list.rb

Overview

A consumer of Kafka messages. It uses the high-level consumer approach where the Kafka brokers automatically assign partitions and load balance partitions over consumers that have the same :"group.id" set in their configuration.

To create a consumer set up a Config and call consumer on that. It is mandatory to set :"group.id" in the configuration.

Consumer implements Enumerable, so you can use each to consume messages, or for example each_slice to consume batches of messages.

Defined Under Namespace

Classes: Headers, Message, Partition, TopicPartitionList

Instance Method Summary collapse

Instance Method Details

#assign(list) ⇒ Object

Atomic assignment of partitions to consume

Parameters:

Raises:


144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/rdkafka/consumer.rb', line 144

def assign(list)
  unless list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  begin
    response = Rdkafka::Bindings.rd_kafka_assign(@native_kafka, tpl)
    if response != 0
      raise Rdkafka::RdkafkaError.new(response, "Error assigning '#{list.to_h}'")
    end
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
  end
end

#assignmentTopicPartitionList

Returns the current partition assignment.

Returns:

Raises:


166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/rdkafka/consumer.rb', line 166

def assignment
  ptr = FFI::MemoryPointer.new(:pointer)
  response = Rdkafka::Bindings.rd_kafka_assignment(@native_kafka, ptr)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end

  tpl = ptr.read_pointer

  if !tpl.null?
    begin
      Rdkafka::Consumer::TopicPartitionList.from_native_tpl(tpl)
    ensure
      Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy tpl
    end
  end
ensure
  ptr.free
end

#closenil

Close this consumer

Returns:

  • (nil)

22
23
24
25
26
27
28
29
# File 'lib/rdkafka/consumer.rb', line 22

def close
  return unless @native_kafka

  @closing = true
  Rdkafka::Bindings.rd_kafka_consumer_close(@native_kafka)
  Rdkafka::Bindings.rd_kafka_destroy(@native_kafka)
  @native_kafka = nil
end

#cluster_idString?

Returns the ClusterId as reported in broker metadata.

Returns:

  • (String, nil)

281
282
283
# File 'lib/rdkafka/consumer.rb', line 281

def cluster_id
  Rdkafka::Bindings.rd_kafka_clusterid(@native_kafka)
end

#commit(list = nil, async = false) ⇒ nil

Manually commit the current offsets of this consumer.

To use this set enable.auto.committo false to disable automatic triggering of commits.

If enable.auto.offset.store is set to true the offset of the last consumed message for every partition is used. If set to false you can use #store_offset to indicate when a message has been fully processed.

Parameters:

  • list (TopicPartitionList, nil) (defaults to: nil)

    The topic with partitions to commit

  • async (Boolean) (defaults to: false)

    Whether to commit async or wait for the commit to finish

Returns:

  • (nil)

Raises:


371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
# File 'lib/rdkafka/consumer.rb', line 371

def commit(list=nil, async=false)
  if !list.nil? && !list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be nil or a TopicPartitionList")
  end

  tpl = list ? list.to_native_tpl : nil

  begin
    response = Rdkafka::Bindings.rd_kafka_commit(@native_kafka, tpl, async)
    if response != 0
      raise Rdkafka::RdkafkaError.new(response)
    end
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) if tpl
  end
end

#committed(list = nil, timeout_ms = 1200) ⇒ TopicPartitionList

Return the current committed offset per partition for this consumer group. The offset field of each requested partition will either be set to stored offset or to -1001 in case there was no stored offset for that partition.

Parameters:

  • list (TopicPartitionList, nil) (defaults to: nil)

    The topic with partitions to get the offsets for or nil to use the current subscription.

  • timeout_ms (Integer) (defaults to: 1200)

    The timeout for fetching this information.

Returns:

Raises:

  • (RdkafkaError)

    When getting the committed positions fails.


195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/rdkafka/consumer.rb', line 195

def committed(list=nil, timeout_ms=1200)
  if list.nil?
    list = assignment
  elsif !list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be nil or a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  begin
    response = Rdkafka::Bindings.rd_kafka_committed(@native_kafka, tpl, timeout_ms)
    if response != 0
      raise Rdkafka::RdkafkaError.new(response)
    end
    TopicPartitionList.from_native_tpl(tpl)
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
  end
end

#each {|message| ... } ⇒ nil

Poll for new messages and yield for each received one. Iteration will end when the consumer is closed.

If enable.partition.eof is turned on in the config this will raise an error when an eof is reached, so you probably want to disable that when using this method of iteration.

Yield Parameters:

  • message (Message)

    Received message

Returns:

  • (nil)

Raises:


430
431
432
433
434
435
436
437
438
439
440
441
442
443
# File 'lib/rdkafka/consumer.rb', line 430

def each
  loop do
    message = poll(250)
    if message
      yield(message)
    else
      if @closing
        break
      else
        next
      end
    end
  end
end

#lag(topic_partition_list, watermark_timeout_ms = 100) ⇒ Hash<String, Hash<Integer, Integer>>

Calculate the consumer lag per partition for the provided topic partition list. You can get a suitable list by calling #committed or position (TODO). It is also possible to create one yourself, in this case you have to provide a list that already contains all the partitions you need the lag for.

Parameters:

  • topic_partition_list (TopicPartitionList)

    The list to calculate lag for.

  • watermark_timeout_ms (Integer) (defaults to: 100)

    The timeout for each query watermark call.

Returns:

  • (Hash<String, Hash<Integer, Integer>>)

    A hash containing all topics with the lag per partition

Raises:


257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/rdkafka/consumer.rb', line 257

def lag(topic_partition_list, watermark_timeout_ms=100)
  out = {}

  topic_partition_list.to_h.each do |topic, partitions|
    # Query high watermarks for this topic's partitions
    # and compare to the offset in the list.
    topic_out = {}
    partitions.each do |p|
      next if p.offset.nil?
      low, high = query_watermark_offsets(
        topic,
        p.partition,
        watermark_timeout_ms
      )
      topic_out[p.partition] = high - p.offset
    end
    out[topic] = topic_out
  end
  out
end

#member_idString?

Returns this client's broker-assigned group member id

This currently requires the high-level KafkaConsumer

Returns:

  • (String, nil)

290
291
292
# File 'lib/rdkafka/consumer.rb', line 290

def member_id
  Rdkafka::Bindings.rd_kafka_memberid(@native_kafka)
end

#pause(list) ⇒ nil

Pause producing or consumption for the provided list of partitions

Parameters:

Returns:

  • (nil)

Raises:


74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/rdkafka/consumer.rb', line 74

def pause(list)
  unless list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  begin
    response = Rdkafka::Bindings.rd_kafka_pause_partitions(@native_kafka, tpl)

    if response != 0
      list = TopicPartitionList.from_native_tpl(tpl)
      raise Rdkafka::RdkafkaTopicPartitionListError.new(response, list, "Error pausing '#{list.to_h}'")
    end
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
  end
end

#poll(timeout_ms) ⇒ Message?

Poll for the next message on one of the subscribed topics

Parameters:

  • timeout_ms (Integer)

    Timeout of this poll

Returns:

  • (Message, nil)

    A message or nil if there was no new message within the timeout

Raises:


395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
# File 'lib/rdkafka/consumer.rb', line 395

def poll(timeout_ms)
  return unless @native_kafka

  message_ptr = Rdkafka::Bindings.rd_kafka_consumer_poll(@native_kafka, timeout_ms)
  if message_ptr.null?
    nil
  else
    # Create struct wrapper
    native_message = Rdkafka::Bindings::Message.new(message_ptr)
    # Raise error if needed
    if native_message[:err] != 0
      raise Rdkafka::RdkafkaError.new(native_message[:err])
    end
    # Create a message to pass out
    Rdkafka::Consumer::Message.new(native_message)
  end
ensure
  # Clean up rdkafka message if there is one
  if !message_ptr.nil? && !message_ptr.null?
    Rdkafka::Bindings.rd_kafka_message_destroy(message_ptr)
  end
end

#query_watermark_offsets(topic, partition, timeout_ms = 200) ⇒ Integer

Query broker for low (oldest/beginning) and high (newest/end) offsets for a partition.

Parameters:

  • topic (String)

    The topic to query

  • partition (Integer)

    The partition to query

  • timeout_ms (Integer) (defaults to: 200)

    The timeout for querying the broker

Returns:

  • (Integer)

    The low and high watermark

Raises:


224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/rdkafka/consumer.rb', line 224

def query_watermark_offsets(topic, partition, timeout_ms=200)
  low = FFI::MemoryPointer.new(:int64, 1)
  high = FFI::MemoryPointer.new(:int64, 1)

  response = Rdkafka::Bindings.rd_kafka_query_watermark_offsets(
    @native_kafka,
    topic,
    partition,
    low,
    high,
    timeout_ms,
  )
  if response != 0
    raise Rdkafka::RdkafkaError.new(response, "Error querying watermark offsets for partition #{partition} of #{topic}")
  end

  return low.read_array_of_int64(1).first, high.read_array_of_int64(1).first
ensure
  low.free
  high.free
end

#resume(list) ⇒ nil

Resume producing consumption for the provided list of partitions

Parameters:

Returns:

  • (nil)

Raises:


100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/rdkafka/consumer.rb', line 100

def resume(list)
  unless list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  begin
    response = Rdkafka::Bindings.rd_kafka_resume_partitions(@native_kafka, tpl)
    if response != 0
      raise Rdkafka::RdkafkaError.new(response, "Error resume '#{list.to_h}'")
    end
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
  end
end

#seek(message) ⇒ nil

Seek to a particular message. The next poll on the topic/partition will return the message at the given offset.

Parameters:

Returns:

  • (nil)

Raises:


333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/rdkafka/consumer.rb', line 333

def seek(message)
  # rd_kafka_offset_store is one of the few calls that does not support
  # a string as the topic, so create a native topic for it.
  native_topic = Rdkafka::Bindings.rd_kafka_topic_new(
    @native_kafka,
    message.topic,
    nil
  )
  response = Rdkafka::Bindings.rd_kafka_seek(
    native_topic,
    message.partition,
    message.offset,
    0 # timeout
  )
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
ensure
  if native_topic && !native_topic.null?
    Rdkafka::Bindings.rd_kafka_topic_destroy(native_topic)
  end
end

#store_offset(message) ⇒ nil

Store offset of a message to be used in the next commit of this consumer

When using this enable.auto.offset.store should be set to false in the config.

Parameters:

Returns:

  • (nil)

Raises:


303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/rdkafka/consumer.rb', line 303

def store_offset(message)
  # rd_kafka_offset_store is one of the few calls that does not support
  # a string as the topic, so create a native topic for it.
  native_topic = Rdkafka::Bindings.rd_kafka_topic_new(
    @native_kafka,
    message.topic,
    nil
  )
  response = Rdkafka::Bindings.rd_kafka_offset_store(
    native_topic,
    message.partition,
    message.offset
  )
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
ensure
  if native_topic && !native_topic.null?
    Rdkafka::Bindings.rd_kafka_topic_destroy(native_topic)
  end
end

#subscribe(*topics) ⇒ nil

Subscribe to one or more topics letting Kafka handle partition assignments.

Parameters:

  • topics (Array<String>)

    One or more topic names

Returns:

  • (nil)

Raises:


38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/rdkafka/consumer.rb', line 38

def subscribe(*topics)
  # Create topic partition list with topics and no partition set
  tpl = Rdkafka::Bindings.rd_kafka_topic_partition_list_new(topics.length)

  topics.each do |topic|
    Rdkafka::Bindings.rd_kafka_topic_partition_list_add(tpl, topic, -1)
  end

  # Subscribe to topic partition list and check this was successful
  response = Rdkafka::Bindings.rd_kafka_subscribe(@native_kafka, tpl)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response, "Error subscribing to '#{topics.join(', ')}'")
  end
ensure
  Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
end

#subscriptionTopicPartitionList

Return the current subscription to topics and partitions

Returns:

Raises:


122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/rdkafka/consumer.rb', line 122

def subscription
  ptr = FFI::MemoryPointer.new(:pointer)
  response = Rdkafka::Bindings.rd_kafka_subscription(@native_kafka, ptr)

  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end

  native = ptr.read_pointer

  begin
    Rdkafka::Consumer::TopicPartitionList.from_native_tpl(native)
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(native)
  end
end

#unsubscribenil

Unsubscribe from all subscribed topics.

Returns:

  • (nil)

Raises:


60
61
62
63
64
65
# File 'lib/rdkafka/consumer.rb', line 60

def unsubscribe
  response = Rdkafka::Bindings.rd_kafka_unsubscribe(@native_kafka)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
end