Class: Fluent::Plugin::JqOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::JqOutput
- Defined in:
- lib/fluent/plugin/out_jq.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ JqOutput
constructor
A new instance of JqOutput.
- #multi_workers_ready? ⇒ Boolean
- #process(tag, es) ⇒ Object
Constructor Details
#initialize ⇒ JqOutput
Returns a new instance of JqOutput.
30 31 32 33 |
# File 'lib/fluent/plugin/out_jq.rb', line 30 def initialize super require "jq" end |
Instance Method Details
#configure(conf) ⇒ Object
35 36 37 38 39 40 |
# File 'lib/fluent/plugin/out_jq.rb', line 35 def configure(conf) super @jq_filter = JQ::Core.new @jq rescue JQ::Error raise Fluent::ConfigError, "Could not parse jq filter: #{@jq}, error: #{$!.}" end |
#multi_workers_ready? ⇒ Boolean
42 43 44 |
# File 'lib/fluent/plugin/out_jq.rb', line 42 def multi_workers_ready? true end |
#process(tag, es) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/fluent/plugin/out_jq.rb', line 46 def process(tag, es) new_es = Fluent::MultiEventStream.new es.each do |time, record| begin @jq_filter.update(MultiJson.dump(tag: tag, time: time, record: record), false) { |r| # the filter could return an array new_records = [MultiJson.load("[#{r}]").first] new_records.flatten! new_records.each { |new_record| new_es.add time, new_record } } rescue JQ::Error log.error "Failed to transform #{MultiJson.dump record} with #{@jq}, error: #{$!.}" end end new_tag = tag.sub(/^#{Regexp.escape(@remove_tag_prefix)}\./, '') router.emit_stream(new_tag, new_es) end |