Class: Fluent::Plugin::RecordTransformerFilter
- Defined in:
- lib/fluent/plugin/filter_record_transformer.rb
Defined Under Namespace
Classes: PlaceholderExpander, RubyPlaceholderExpander
Constant Summary
Constants included from Configurable
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary
Attributes inherited from Filter
Attributes included from Fluent::PluginLoggerMixin
Attributes inherited from Base
Instance Method Summary collapse
Methods inherited from Filter
#emit_records, #emit_size, #filter, #filter_with_time, #initialize, #measure_metrics, #statistics
Methods included from Fluent::PluginHelper::Mixin
Methods included from Fluent::PluginLoggerMixin
included, #initialize, #terminate
Methods included from Fluent::PluginId
#initialize, #plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop
Methods inherited from Base
#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #initialize, #inspect, #multi_workers_ready?, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #start, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Configurable
#config, #configure_proxy_generate, #configured_section_create, included, #initialize, lookup_type, register_type
Constructor Details
This class inherits a constructor from Fluent::Plugin::Filter
Instance Method Details
#configure(conf) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/fluent/plugin/filter_record_transformer.rb', line 45 def configure(conf) super map = {} # <record></record> directive conf.elements.select { |element| element.name == 'record' }.each do |element| element.each_pair do |k, v| element.has_key?(k) # to suppress unread configuration warning map[k] = parse_value(v) end end if @keep_keys raise Fluent::ConfigError, "`renew_record` must be true to use `keep_keys`" unless @renew_record end @key_deleters = if @remove_keys @remove_keys.map { |k| record_accessor_create(k) } end = { log: log, auto_typecast: @auto_typecast, } @placeholder_expander = if @enable_ruby # require utilities which would be used in ruby placeholders require 'pathname' require 'uri' require 'cgi' RubyPlaceholderExpander.new() else PlaceholderExpander.new() end @map = @placeholder_expander.preprocess_map(map) @hostname = Socket.gethostname end |
#filter_stream(tag, es) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/fluent/plugin/filter_record_transformer.rb', line 84 def filter_stream(tag, es) new_es = Fluent::MultiEventStream.new tag_parts = tag.split('.') tag_prefix = tag_prefix(tag_parts) tag_suffix = tag_suffix(tag_parts) placeholder_values = { 'tag' => tag, 'tag_parts' => tag_parts, 'tag_prefix' => tag_prefix, 'tag_suffix' => tag_suffix, 'hostname' => @hostname, } es.each do |time, record| begin placeholder_values['time'] = @placeholder_expander.time_value(time) placeholder_values['record'] = record new_record = reform(record, placeholder_values) if @renew_time_key && new_record.has_key?(@renew_time_key) time = Fluent::EventTime.from_time(Time.at(new_record[@renew_time_key].to_f)) end @key_deleters.each { |deleter| deleter.delete(new_record) } if @key_deleters new_es.add(time, new_record) rescue => e router.emit_error_event(tag, time, record, e) log.debug { "map:#{@map} record:#{record} placeholder_values:#{placeholder_values}" } end end new_es end |