Method: Rdkafka::Consumer#poll

Defined in:
lib/rdkafka/consumer.rb

#poll(timeout_ms) ⇒ Message?

Poll for the next message on one of the subscribed topics

Parameters:

  • timeout_ms (Integer)

    Timeout of this poll

Returns:

  • (Message, nil)

    A message or nil if there was no new message within the timeout

Raises:

[View source] [View on GitHub]

537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
# File 'lib/rdkafka/consumer.rb', line 537

def poll(timeout_ms)
  closed_consumer_check(__method__)

  message_ptr = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_consumer_poll(inner, timeout_ms)
  end
  if message_ptr.null?
    nil
  else
    # Create struct wrapper
    native_message = Rdkafka::Bindings::Message.new(message_ptr)
    # Raise error if needed
    if native_message[:err] != 0
      raise Rdkafka::RdkafkaError.new(native_message[:err])
    end
    # Create a message to pass out
    Rdkafka::Consumer::Message.new(native_message)
  end
ensure
  # Clean up rdkafka message if there is one
  if message_ptr && !message_ptr.null?
    Rdkafka::Bindings.rd_kafka_message_destroy(message_ptr)
  end
end