Class: Fluent::Plugin::RdKafkaGroupInput
Defined Under Namespace
Classes: Batch, ForShutdown
Constant Summary
collapse
- BufferError =
Fluent::Plugin::Buffer::BufferOverflowError
KafkaPluginUtil::SSLSettings::DummyFormatter
Instance Method Summary
collapse
included
included, #pickup_ssl_endpoint, #read_ssl_file
Constructor Details
Returns a new instance of RdKafkaGroupInput.
65
66
67
68
69
70
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 65
def initialize
super
@time_parser = nil
@retry_count = 1
end
|
Instance Method Details
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 86
def configure(conf)
compat_parameters_convert(conf, :parser)
super
log.warn "The in_rdkafka_group consumer was not yet tested under heavy production load. Use it at your own risk!"
log.info "Will watch for topics #{@topics} at brokers " \
"#{@kafka_configs["bootstrap.servers"]} and '#{@kafka_configs["group.id"]}' group"
@topics = _config_to_array(@topics)
parser_conf = conf.elements('parse').first
unless parser_conf
raise Fluent::ConfigError, "<parse> section or format parameter is required."
end
unless parser_conf["@type"]
raise Fluent::ConfigError, "parse/@type is required."
end
@parser_proc = setup_parser(parser_conf)
@time_source = :record if @use_record_time
if @time_source == :record and @time_format
@time_parser = Fluent::TimeParser.new(@time_format)
end
end
|
#each_batch {|batch| ... } ⇒ Object
Executes the passed codeblock on a batch of messages. It is guaranteed that every message in a given batch belongs to the same topic, because the tagging logic in :run expects that property. The number of maximum messages in a batch is capped by the :max_batch_size configuration value. It ensures that consuming from a single topic for a long time (e.g. with ‘auto.offset.reset` set to `earliest`) does not lead to memory exhaustion. Also, calling consumer.poll advances thes consumer offset, so in case the process crashes we might lose at most :max_batch_size messages.
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 205
def each_batch(&block)
batch = nil
message = nil
while @consumer
message = @consumer.poll(@max_wait_time_ms)
if message
if not batch
batch = Batch.new(message.topic)
elsif batch.topic != message.topic || batch.messages.size >= @max_batch_size
yield batch
batch = Batch.new(message.topic)
end
batch.messages << message
else
yield batch if batch
batch = nil
end
end
yield batch if batch
end
|
#emit_events(tag, es) ⇒ Object
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 284
def emit_events(tag, es)
retries = 0
begin
router.emit_stream(tag, es)
rescue BufferError
raise ForShutdown if @consumer.nil?
if @retry_emit_limit.nil?
sleep 1
retry
end
if retries < @retry_emit_limit
retries += 1
sleep 1
retry
else
raise RuntimeError, "Exceeds retry_emit_limit"
end
end
end
|
#multi_workers_ready? ⇒ Boolean
80
81
82
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 80
def multi_workers_ready?
true
end
|
#reconnect_consumer ⇒ Object
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 169
def reconnect_consumer
log.warn "Stopping Consumer"
consumer = @consumer
@consumer = nil
if consumer
consumer.close
end
log.warn "Could not connect to broker. retry_time:#{@retry_count}. Next retry will be in #{@retry_wait_seconds} seconds"
@retry_count = @retry_count + 1
sleep @retry_wait_seconds
@consumer = setup_consumer
log.warn "Re-starting consumer #{Time.now.to_s}"
@retry_count = 0
rescue =>e
log.error "unexpected error during re-starting consumer object access", :error => e.to_s
log.error_backtrace
if @retry_count <= @retry_limit or disable_retry_limit
reconnect_consumer
end
end
|
#run ⇒ Object
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 226
def run
while @consumer
begin
each_batch { |batch|
log.debug "A new batch for topic #{batch.topic} with #{batch.messages.size} messages"
es = Fluent::MultiEventStream.new
tag = batch.topic
tag = @add_prefix + "." + tag if @add_prefix
tag = tag + "." + @add_suffix if @add_suffix
batch.messages.each { |msg|
begin
record = @parser_proc.call(msg)
case @time_source
when :kafka
record_time = Fluent::EventTime.from_time(msg.timestamp)
when :now
record_time = Fluent::Engine.now
when :record
if @time_format
record_time = @time_parser.parse(record[@record_time_key].to_s)
else
record_time = record[@record_time_key]
end
else
log.fatal "BUG: invalid time_source: #{@time_source}"
end
if @kafka_message_key
record[@kafka_message_key] = msg.key
end
if @add_headers
msg..each_pair { |k, v|
record[k] = v
}
end
es.add(record_time, record)
rescue => e
log.warn "parser error in #{msg.topic}/#{msg.partition}", :error => e.to_s, :value => msg.payload, :offset => msg.offset
log.debug_backtrace
end
}
unless es.empty?
emit_events(tag, es)
end
}
rescue ForShutdown
rescue => e
log.error "unexpected error during consuming events from kafka. Re-fetch events.", :error => e.to_s
log.error_backtrace
reconnect_consumer
end
end
rescue => e
log.error "unexpected error during consumer object access", :error => e.to_s
log.error_backtrace
end
|
#setup_consumer ⇒ Object
163
164
165
166
167
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 163
def setup_consumer
consumer = Rdkafka::Config.new(@kafka_configs).consumer
consumer.subscribe(*@topics)
consumer
end
|
#setup_parser(parser_conf) ⇒ Object
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 114
def setup_parser(parser_conf)
format = parser_conf["@type"]
case format
when 'json'
begin
require 'oj'
Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS
Proc.new { |msg| Oj.load(msg.payload) }
rescue LoadError
require 'yajl'
Proc.new { |msg| Yajl::Parser.parse(msg.payload) }
end
when 'ltsv'
require 'ltsv'
Proc.new { |msg| LTSV.parse(msg.payload, {:symbolize_keys => false}).first }
when 'msgpack'
require 'msgpack'
Proc.new { |msg| MessagePack.unpack(msg.payload) }
when 'text'
Proc.new { |msg| {@message_key => msg.payload} }
else
@custom_parser = parser_create(usage: 'in-rdkafka-plugin', conf: parser_conf)
Proc.new { |msg|
@custom_parser.parse(msg.payload) {|_time, record|
record
}
}
end
end
|
#shutdown ⇒ Object
152
153
154
155
156
157
158
159
160
161
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 152
def shutdown
consumer = @consumer
@consumer = nil
consumer.close
super
end
|
#start ⇒ Object
144
145
146
147
148
149
150
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 144
def start
super
@consumer = setup_consumer
thread_create(:in_rdkafka_group, &method(:run))
end
|