Class: Kafka::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/consumer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Consumer

Returns a new instance of Consumer.

Parameters:



17
18
19
20
21
22
23
24
# File 'lib/kafka/consumer.rb', line 17

def initialize(config)
  # Initialize the client
  @client = Kafka::FFI::Consumer.new(config)

  # Redirect the main event queue so calls to consumer_poll will fire
  # callbacks instead of having to have a separate poller thread.
  @client.poll_set_consumer
end

Instance Attribute Details

#clientKafka::FFI::Consumer (readonly)

Returns the backing Kafka::FFI::Consumer.



14
15
16
# File 'lib/kafka/consumer.rb', line 14

def client
  @client
end

Instance Method Details

#assignmentsHash{String => Array<Integer>}

Retrieves the set of topic + partition assignments for the consumer.

Examples:

consumer.assignment # => { "topic" => [1,2,3] }

Returns:

  • (Hash{String => Array<Integer>})

    List of partition assignments keyed by the topic name.



45
46
47
# File 'lib/kafka/consumer.rb', line 45

def assignments
  @client.assignment
end

#closeObject

Note:

After calling #close it is unsafe to call any other method on the Consumer.

Gracefully shutdown the consumer and its connections.



73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/kafka/consumer.rb', line 73

def close
  # @see https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#high-level-kafkaconsumer

  # Gracefully shutdown the consumer, leaving the consumer group,
  # committing any remaining offsets, and releasing resources back to the
  # system.
  #
  # This will effectively call #close on the Client automatically. Trying
  # to follow the documentation and calling #close before #destroy caused
  # warnings due to brokers disconnecting but just calling #destroy fixes
  # that.
  @client.destroy
end

#commit(msg, async: false) ⇒ Object

Parameters:



58
59
60
61
62
63
64
65
66
67
# File 'lib/kafka/consumer.rb', line 58

def commit(msg, async: false)
  list = Kafka::FFI::TopicPartitionList.new(1)

  list.add(msg.topic, msg.partition)
  list.set_offset(msg.topic, msg.partition, msg.offset + 1)

  @client.commit(list, async)
ensure
  list.destroy
end

#poll(timeout: 250, &block) ⇒ Object

Poll the consumer for waiting message.

Parameters:

  • timeout (Integer) (defaults to: 250)

    Time to wait in milliseconds for a message to be available.



53
54
55
# File 'lib/kafka/consumer.rb', line 53

def poll(timeout: 250, &block)
  @client.consumer_poll(timeout, &block)
end

#subscribe(topic, *rest) ⇒ Object

Subscribe the consumer to the given list of topics. Once the subscriptions have become active and partitions assigned, calls to #poll will yield messages for the subscribed topics.

subscribe will set the list of subscriptions, removing any that are not included in the most recent call.

Parameters:

  • topic (String, Array<String>)

    Topics to subscribe to



34
35
36
# File 'lib/kafka/consumer.rb', line 34

def subscribe(topic, *rest)
  @client.subscribe(topic, *rest)
end