Class: Fluent::Plugin::MonitorAgentInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::MonitorAgentInput
show all
- Defined in:
- lib/fluent/plugin/in_monitor_agent_modified.rb
Defined Under Namespace
Classes: APIHandler, NotFoundJson
Constant Summary
collapse
- MONITOR_INFO =
They are deprecated but remain for compatibility
{
'output_plugin' => ->(){ is_a?(::Fluent::Plugin::Output) },
'buffer_queue_length' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.queue.size },
'buffer_timekeys' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.timekeys },
'buffer_total_queued_size' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.stage_size + @buffer.queue_size },
'retry_count' => ->(){ instance_variable_defined?(:@num_errors) ? @num_errors : nil },
}
- IGNORE_ATTRIBUTES =
%i(@config_root_section @config @masked_config)
- RETRY_INFO =
{
'start' => '@start',
'steps' => '@steps',
'next_time' => '@next_time',
}
Instance Method Summary
collapse
Constructor Details
Returns a new instance of MonitorAgentInput.
184
185
186
187
188
|
# File 'lib/fluent/plugin/in_monitor_agent_modified.rb', line 184
def initialize
super
@first_warn = false
end
|
Instance Method Details
#all_plugins ⇒ Object
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
|
# File 'lib/fluent/plugin/in_monitor_agent_modified.rb', line 250
def all_plugins
array = []
array.concat Fluent::Engine.root_agent.inputs
array.concat Fluent::Engine.root_agent.outputs
array.concat Fluent::Engine.root_agent.filters
Fluent::Engine.root_agent.labels.each { |name, l|
array.concat l.outputs
array.concat l.filters
}
array
end
|
190
191
192
193
|
# File 'lib/fluent/plugin/in_monitor_agent_modified.rb', line 190
def configure(conf)
super
@port += fluentd_worker_id
end
|
#fluentd_opts ⇒ Object
403
404
405
|
# File 'lib/fluent/plugin/in_monitor_agent_modified.rb', line 403
def fluentd_opts
@fluentd_opts ||= get_fluentd_opts
end
|
#get_fluentd_opts ⇒ Object
407
408
409
410
411
412
413
414
|
# File 'lib/fluent/plugin/in_monitor_agent_modified.rb', line 407
def get_fluentd_opts
opts = {}
ObjectSpace.each_object(Fluent::Supervisor) { |obj|
opts.merge!(obj.options)
break
}
opts
end
|
#get_monitor_info(pe, opts = {}) ⇒ Object
get monitor info from the plugin ‘pe` and return a hash object
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
|
# File 'lib/fluent/plugin/in_monitor_agent_modified.rb', line 317
def get_monitor_info(pe, opts={})
obj = {}
obj['plugin_id'] = pe.plugin_id
obj['plugin_category'] = plugin_category(pe)
obj['type'] = pe.config['@type']
obj['config'] = pe.config if opts[:with_config]
MONITOR_INFO.each_pair {|key,code|
begin
catch(:skip) do
obj[key] = pe.instance_exec(&code)
end
rescue NoMethodError => e
unless @first_warn
log.error "NoMethodError in monitoring plugins", key: key, plugin: pe.class, error: e
log.error_backtrace
@first_warn = true
end
rescue => e
log.warn "unexpected error in monitoring plugins", key: key, plugin: pe.class, error: e
end
}
if pe.respond_to?(:statistics)
obj.merge!(pe.statistics['output'] || {})
end
obj['retry'] = get_retry_info(pe.retry) if opts[:with_retry] and pe.instance_variable_defined?(:@retry)
if opts[:with_debug_info]
iv = {}
pe.instance_eval do
instance_variables.each {|sym|
next if IGNORE_ATTRIBUTES.include?(sym)
key = sym.to_s[1..-1] iv[key] = instance_variable_get(sym)
}
end
obj['instance_variables'] = iv
elsif ivars = opts[:ivars]
iv = {}
ivars.each {|name|
iname = "@#{name}"
iv[name] = pe.instance_variable_get(iname) if pe.instance_variable_defined?(iname)
}
obj['instance_variables'] = iv
end
obj
end
|
#get_retry_info(pe_retry) ⇒ Object
378
379
380
381
382
383
384
385
386
387
388
|
# File 'lib/fluent/plugin/in_monitor_agent_modified.rb', line 378
def get_retry_info(pe_retry)
retry_variables = {}
if pe_retry
RETRY_INFO.each_pair { |key, param|
retry_variables[key] = pe_retry.instance_variable_get(param)
}
end
retry_variables
end
|
#multi_workers_ready? ⇒ Boolean
195
196
197
|
# File 'lib/fluent/plugin/in_monitor_agent_modified.rb', line 195
def multi_workers_ready?
true
end
|
#plugin_category(pe) ⇒ Object
390
391
392
393
394
395
396
397
398
399
400
401
|
# File 'lib/fluent/plugin/in_monitor_agent_modified.rb', line 390
def plugin_category(pe)
case pe
when Fluent::Plugin::Input
'input'.freeze
when Fluent::Plugin::Output, Fluent::Plugin::MultiOutput, Fluent::Plugin::BareOutput
'output'.freeze
when Fluent::Plugin::Filter
'filter'.freeze
else
'unknown'.freeze
end
end
|
#plugin_info_by_id(plugin_id, opts = {}) ⇒ Object
search a plugin by plugin_id
286
287
288
289
290
291
292
293
294
295
|
# File 'lib/fluent/plugin/in_monitor_agent_modified.rb', line 286
def plugin_info_by_id(plugin_id, opts={})
found = all_plugins.find {|pe|
pe.respond_to?(:plugin_id) && pe.plugin_id.to_s == plugin_id
}
if found
get_monitor_info(found, opts)
else
nil
end
end
|
#plugin_info_by_tag(tag, opts = {}) ⇒ Object
try to match the tag and get the info from the matched output plugin TODO: Support output in label
273
274
275
276
277
278
279
280
281
282
283
|
# File 'lib/fluent/plugin/in_monitor_agent_modified.rb', line 273
def plugin_info_by_tag(tag, opts={})
matches = Fluent::Engine.root_agent.event_router.instance_variable_get(:@match_rules)
matches.each { |rule|
if rule.match?(tag)
if rule.collector.is_a?(Fluent::Plugin::Output) || rule.collector.is_a?(Fluent::Output)
return get_monitor_info(rule.collector, opts)
end
end
}
nil
end
|
#plugins_info_all(opts = {}) ⇒ Object
308
309
310
311
312
|
# File 'lib/fluent/plugin/in_monitor_agent_modified.rb', line 308
def plugins_info_all(opts={})
all_plugins.map {|pe|
get_monitor_info(pe, opts)
}
end
|
#plugins_info_by_type(type, opts = {}) ⇒ Object
This method returns an array because multiple plugins could have the same type
299
300
301
302
303
304
305
306
|
# File 'lib/fluent/plugin/in_monitor_agent_modified.rb', line 299
def plugins_info_by_type(type, opts={})
array = all_plugins.select {|pe|
(pe.config['@type'] == type) rescue nil
}
array.map {|pe|
get_monitor_info(pe, opts)
}
end
|
#start ⇒ Object
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
|
# File 'lib/fluent/plugin/in_monitor_agent_modified.rb', line 206
def start
super
@old = nil
log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins for worker#{fluentd_worker_id}"
api_handler = APIHandler.new(self)
create_http_server(:in_monitor_http_server_helper, addr: @bind, port: @port, logger: log, default_app: NotFoundJson) do |serv|
serv.get('/api/plugins') { |req| api_handler.plugins_ltsv(req) }
serv.get('/api/plugins.json') { |req| api_handler.plugins_json(req) }
serv.get('/api/config') { |req| api_handler.config_ltsv(req) }
serv.get('/api/config.json') { |req| api_handler.config_json(req) }
end
if @tag
log.debug "tag parameter is specified. Emit plugins info to '#{@tag}'"
opts = {with_config: false, with_retry: false}
if @old.nil?
@old = plugins_info_all(opts)
end
timer_execute(:in_monitor_agent_emit, @emit_interval, repeat: true) {
es = Fluent::MultiEventStream.new
now = Fluent::Engine.now
@new = plugins_info_all(opts)
@new.each_with_index do |record, index|
if record["emit_records"]
record["records"] = record["emit_records"] - @old[index]["emit_records"]
end
es.add(now, record)
end
@old = @new
router.emit_stream(@tag, es)
}
end
end
|