Class: LogStash::Inputs::Kafka

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/kafka.rb

Overview

This input will read events from a Kafka topic. It uses the high level consumer API provided by Kafka to read messages from the broker. It also maintains the state of what has been consumed using Zookeeper. The default input codec is json

You must configure topic_id, white_list or black_list. By default it will connect to a Zookeeper running on localhost. All the broker information is read from Zookeeper state

Ideally you should have as many threads as the number of partitions for a perfect balance – more threads than partitions means that some threads will be idle

For more information see kafka.apache.org/documentation.html#theconsumer

Kafka consumer configuration: kafka.apache.org/documentation.html#consumerconfigs

Instance Method Summary collapse

Instance Method Details

#registerObject



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/logstash/inputs/kafka.rb', line 86

def register
  LogStash::Logger.setup_log4j(@logger)
  options = {
      :zk_connect => @zk_connect,
      :group_id => @group_id,
      :topic_id => @topic_id,
      :auto_offset_reset => @auto_offset_reset,
      :rebalance_max_retries => @rebalance_max_retries,
      :rebalance_backoff_ms => @rebalance_backoff_ms,
      :consumer_timeout_ms => @consumer_timeout_ms,
      :consumer_restart_on_error => @consumer_restart_on_error,
      :consumer_restart_sleep_ms => @consumer_restart_sleep_ms,
      :consumer_id => @consumer_id,
      :fetch_message_max_bytes => @fetch_message_max_bytes,
      :allow_topics => @white_list,
      :filter_topics => @black_list
  }
  if @reset_beginning
    options[:reset_beginning] = 'from-beginning'
  end # if :reset_beginning
  topic_or_filter = [@topic_id, @white_list, @black_list].compact
  if topic_or_filter.count == 0
    raise('topic_id, white_list or black_list required.')
  elsif topic_or_filter.count > 1
    raise('Invalid combination of topic_id, white_list or black_list. Use only one.')
  end
  @kafka_client_queue = SizedQueue.new(@queue_size)
  @consumer_group = create_consumer_group(options)
  @logger.info('Registering kafka', :group_id => @group_id, :topic_id => @topic_id, :zk_connect => @zk_connect)
end

#run(logstash_queue) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/logstash/inputs/kafka.rb', line 118

def run(logstash_queue)
  java_import 'kafka.common.ConsumerRebalanceFailedException'
  @logger.info('Running kafka', :group_id => @group_id, :topic_id => @topic_id, :zk_connect => @zk_connect)
  begin
    @consumer_group.run(@consumer_threads,@kafka_client_queue)
    begin
      while true
        event = @kafka_client_queue.pop
        queue_event(event, logstash_queue)
      end
    rescue LogStash::ShutdownSignal
      @logger.info('Kafka got shutdown signal')
      @consumer_group.shutdown
    end
    until @kafka_client_queue.empty?
      queue_event(@kafka_client_queue.pop,logstash_queue)
    end
    @logger.info('Done running kafka input')
  rescue => e
    @logger.warn('kafka client threw exception, restarting',
                 :exception => e)
    if @consumer_group.running?
      @consumer_group.shutdown
    end
    sleep(Float(@consumer_restart_sleep_ms) * 1 / 1000)
    retry
  end
  finished
end