Class: Fluent::Plugin::Parser
Direct Known Subclasses
Compat::Parser, Apache2Parser, CSVParser, InHttpParser, JSONParser, LabeledTSVParser, MessagePackParser, MultilineParser, NoneParser, RegexpParser, SyslogParser, TSVParser
Defined Under Namespace
Classes: ParserError, TimeoutChecker
Constant Summary
collapse
- AVAILABLE_PARSER_VALUE_TYPES =
['string', 'integer', 'float', 'bool', 'time', 'array']
- PARSER_TYPES =
[:text_per_line, :text, :binary]
- TRUTHY_VALUES =
['true', 'yes', '1']
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
Attributes inherited from Base
#under_plugin_development
Instance Method Summary
collapse
-
#build_type_converters(types) ⇒ Object
-
#call(*a, &b) ⇒ Object
-
#configure(conf) ⇒ Object
-
#convert_values(time, record) ⇒ Object
def parse(text, &block) time, record = convert_values(time, record) yield time, record end.
-
#implement?(feature) ⇒ Boolean
-
#initialize ⇒ Parser
constructor
A new instance of Parser.
-
#parse(text, &block) ⇒ Object
-
#parse_io(io, &block) ⇒ Object
-
#parse_partial_data(data, &block) ⇒ Object
-
#parse_time(record) ⇒ Object
-
#parse_with_timeout(text, &block) ⇒ Object
-
#parser_type ⇒ Object
-
#start ⇒ Object
-
#stop ⇒ Object
-
#string_like_null(value, null_empty_string = @null_empty_string, null_value_regexp = @null_value_pattern) ⇒ Object
included, #time_parser_create
#log, #owner, #owner=
Methods inherited from Base
#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #inspect, #multi_workers_ready?, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stopped?, #string_safe_encoding, #terminate, #terminated?
#system_config, #system_config_override
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
#initialize ⇒ Parser
Returns a new instance of Parser.
108
109
110
111
112
|
# File 'lib/fluent/plugin/parser.rb', line 108
def initialize
super
@timeout_checker = nil
end
|
Instance Attribute Details
#type_converters ⇒ Object
101
102
103
|
# File 'lib/fluent/plugin/parser.rb', line 101
def type_converters
@type_converters
end
|
Instance Method Details
#build_type_converters(types) ⇒ Object
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
|
# File 'lib/fluent/plugin/parser.rb', line 228
def build_type_converters(types)
return nil unless types
converters = {}
types.each_pair do |field_name, type_definition|
type, option = type_definition.split(":", 2)
unless AVAILABLE_PARSER_VALUE_TYPES.include?(type)
raise Fluent::ConfigError, "unknown value conversion for key:'#{field_name}', type:'#{type}'"
end
conv = case type
when 'string' then ->(v){ v.to_s }
when 'integer' then ->(v){ v.to_i rescue v.to_s.to_i }
when 'float' then ->(v){ v.to_f rescue v.to_s.to_f }
when 'bool' then ->(v){ TRUTHY_VALUES.include?(v.to_s.downcase) }
when 'time'
timep = if option
time_type = 'string' timezone, time_format = option.split(':', 2)
unless Fluent::Timezone.validate(timezone)
timezone, time_format = nil, option
end
if Fluent::TimeMixin::TIME_TYPES.include?(time_format)
time_type, time_format = time_format, nil end
time_parser_create(type: time_type.to_sym, format: time_format, timezone: timezone)
else
time_parser_create(type: :string, format: nil, timezone: nil)
end
->(v){ timep.parse(v) rescue nil }
when 'array'
delimiter = option ? option.to_s : ','
->(v){ string_safe_encoding(v.to_s){|s| s.split(delimiter) } }
else
raise "BUG: unknown type even after check: #{type}"
end
converters[field_name] = conv
end
converters
end
|
#call(*a, &b) ⇒ Object
157
158
159
160
161
|
# File 'lib/fluent/plugin/parser.rb', line 157
def call(*a, &b)
parse(*a, &b)
end
|
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# File 'lib/fluent/plugin/parser.rb', line 114
def configure(conf)
super
@time_parser = time_parser_create
@type_converters = build_type_converters(@types)
@execute_convert_values = @type_converters || @null_value_pattern || @null_empty_string
@timeout_checker = if @timeout
class << self
alias_method :parse_orig, :parse
alias_method :parse, :parse_with_timeout
end
TimeoutChecker.new(@timeout)
else
nil
end
end
|
#convert_values(time, record) ⇒ Object
def parse(text, &block)
time, record = convert_values(time, record)
yield time, record
end
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
|
# File 'lib/fluent/plugin/parser.rb', line 202
def convert_values(time, record)
return time, record unless @execute_convert_values
record.each_key do |key|
value = record[key]
next unless value
if value.is_a?(String) && string_like_null(value)
record[key] = nil
next
end
if @type_converters && @type_converters.has_key?(key)
record[key] = @type_converters[key].call(value)
end
end
return time, record
end
|
#implement?(feature) ⇒ Boolean
163
164
165
166
167
168
169
170
171
|
# File 'lib/fluent/plugin/parser.rb', line 163
def implement?(feature)
methods_of_plugin = self.class.instance_methods(false)
case feature
when :parse_io then methods_of_plugin.include?(:parse_io)
when :parse_partial_data then methods_of_plugin.include?(:parse_partial_data)
else
raise ArgumentError, "Unknown feature for parser plugin: #{feature}"
end
end
|
#parse(text, &block) ⇒ Object
143
144
145
|
# File 'lib/fluent/plugin/parser.rb', line 143
def parse(text, &block)
raise NotImplementedError, "Implement this method in child class"
end
|
#parse_io(io, &block) ⇒ Object
173
174
175
|
# File 'lib/fluent/plugin/parser.rb', line 173
def parse_io(io, &block)
raise NotImplementedError, "Optional API #parse_io is not implemented"
end
|
#parse_partial_data(data, &block) ⇒ Object
177
178
179
|
# File 'lib/fluent/plugin/parser.rb', line 177
def parse_partial_data(data, &block)
raise NotImplementedError, "Optional API #parse_partial_data is not implemented"
end
|
#parse_time(record) ⇒ Object
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
|
# File 'lib/fluent/plugin/parser.rb', line 181
def parse_time(record)
if @time_key && record.respond_to?(:has_key?) && record.has_key?(@time_key)
src = if @keep_time_key
record[@time_key]
else
record.delete(@time_key)
end
@time_parser.parse(src)
elsif @estimate_current_event
Fluent::EventTime.now
else
nil
end
rescue Fluent::TimeParser::TimeParseError => e
raise ParserError, e.message
end
|
#parse_with_timeout(text, &block) ⇒ Object
147
148
149
150
151
152
153
154
155
|
# File 'lib/fluent/plugin/parser.rb', line 147
def parse_with_timeout(text, &block)
@timeout_checker.execute {
parse_orig(text, &block)
}
rescue UncatchableError
log.warn "parsing timed out with #{self.class}: text = #{text}"
yield nil, nil
end
|
#parser_type ⇒ Object
104
105
106
|
# File 'lib/fluent/plugin/parser.rb', line 104
def parser_type
:text_per_line
end
|
#start ⇒ Object
131
132
133
134
135
|
# File 'lib/fluent/plugin/parser.rb', line 131
def start
super
@timeout_checker.start if @timeout_checker
end
|
#stop ⇒ Object
137
138
139
140
141
|
# File 'lib/fluent/plugin/parser.rb', line 137
def stop
super
@timeout_checker.stop if @timeout_checker
end
|
#string_like_null(value, null_empty_string = @null_empty_string, null_value_regexp = @null_value_pattern) ⇒ Object
222
223
224
|
# File 'lib/fluent/plugin/parser.rb', line 222
def string_like_null(value, null_empty_string = @null_empty_string, null_value_regexp = @null_value_pattern)
null_empty_string && value.empty? || null_value_regexp && string_safe_encoding(value){|s| null_value_regexp.match(s) }
end
|