Method: Rdkafka::Consumer#pause

Defined in:
lib/rdkafka/consumer.rb

#pause(list) ⇒ nil

Pause producing or consumption for the provided list of partitions

Parameters:

Returns:

  • (nil)

Raises:

[View source] [View on GitHub]

104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/rdkafka/consumer.rb', line 104

def pause(list)
  closed_consumer_check(__method__)

  unless list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  begin
    response = @native_kafka.with_inner do |inner|
      Rdkafka::Bindings.rd_kafka_pause_partitions(inner, tpl)
    end

    if response != 0
      list = TopicPartitionList.from_native_tpl(tpl)
      raise Rdkafka::RdkafkaTopicPartitionListError.new(response, list, "Error pausing '#{list.to_h}'")
    end
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
  end
end