Class: Kafka::FFI::Queue

Inherits:
OpaquePointer show all
Defined in:
lib/kafka/ffi/queue.rb

Instance Attribute Summary

Attributes inherited from OpaquePointer

#pointer

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from OpaquePointer

by_ref, from_native, inherited, #initialize, to_native

Constructor Details

This class inherits a constructor from Kafka::FFI::OpaquePointer

Class Method Details

.new(client) ⇒ Object



8
9
10
# File 'lib/kafka/ffi/queue.rb', line 8

def self.new(client)
  ::Kafka::FFI.rd_kafka_queue_new(client)
end

Instance Method Details

#destroyObject

Release the applications reference on the queue, possibly destroying it and releasing it’s resources.



59
60
61
62
63
# File 'lib/kafka/ffi/queue.rb', line 59

def destroy
  if !pointer.null?
    ::Kafka::FFI.rd_kafka_queue_destroy(self)
  end
end

#forward(dest) ⇒ Object

Forward events meant for this Queue to the destination Queue instead.

Parameters:

  • dest (Queue)

    Destination queue to forward

  • dest (nil)

    Remove forwarding for this queue.



46
47
48
# File 'lib/kafka/ffi/queue.rb', line 46

def forward(dest)
  ::Kafka::FFI.rd_kafka_queue_forward(self, dest)
end

#lengthInteger

Retrieve the current number of elemens in the queue.

Returns:

  • (Integer)

    Number of elements in the queue



53
54
55
# File 'lib/kafka/ffi/queue.rb', line 53

def length
  ::Kafka::FFI.rd_kafka_queue_length(self)
end

#poll(timeout: 1000) {|event| ... } ⇒ nil, Event

Poll a queue for an event, waiting up to timeout milliseconds. Takes an optional block which will handle destroying the event at the completion of the block.

Parameters:

  • timeout (Integer) (defaults to: 1000)

    Max time to wait in millseconds for an Event.

Yields:

  • (event)

Yield Parameters:

  • event (Event)

    Polled event

Returns:

  • (nil)

    No event was available within the timeout

  • (Event)

    Event polled from the queue, application is responsible for calling #destroy on the Event when finished with it.

  • When passed a block, the result returned by the block



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/kafka/ffi/queue.rb', line 25

def poll(timeout: 1000)
  event = ::Kafka::FFI.rd_kafka_queue_poll(self, timeout)
  if event.nil?
    return nil
  end

  if block_given?
    begin
      yield(event)
    ensure
      event.destroy
    end
  else
    event
  end
end