Class: Fluent::Plugin::JqOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_jq.rb

Instance Method Summary collapse

Constructor Details

#initializeJqOutput

Returns a new instance of JqOutput.



30
31
32
33
# File 'lib/fluent/plugin/out_jq.rb', line 30

def initialize
  super
  require "jq"
end

Instance Method Details

#configure(conf) ⇒ Object



35
36
37
38
39
40
# File 'lib/fluent/plugin/out_jq.rb', line 35

def configure(conf)
  super
  @jq_filter = JQ::Core.new @jq
rescue JQ::Error
  raise Fluent::ConfigError, "Could not parse jq filter: #{@jq}, error: #{$!.message}"
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


42
43
44
# File 'lib/fluent/plugin/out_jq.rb', line 42

def multi_workers_ready?
  true
end

#process(tag, es) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/fluent/plugin/out_jq.rb', line 46

def process(tag, es)
  new_es = Fluent::MultiEventStream.new
  es.each do |time, record|
	begin
	  @jq_filter.update(MultiJson.dump(tag: tag, time: time, record: record), false) { |r|
 # the filter could return an array
 new_records = [MultiJson.load("[#{r}]").first]
 new_records.flatten!
 new_records.each { |new_record| new_es.add time, new_record }
	  }
	rescue JQ::Error
	  log.error "Failed to transform #{MultiJson.dump record} with #{@jq}, error: #{$!.message}"
	end
  end

  new_tag = tag.sub(/^#{Regexp.escape(@remove_tag_prefix)}\./, '')
  router.emit_stream(new_tag, new_es)
end