Class: Fluent::UnitTimeFilterOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::UnitTimeFilterOutput
- Defined in:
- lib/fluent/plugin/unit_time_filter_buffer.rb,
lib/fluent/plugin/out_unit_time_filter.rb
Defined Under Namespace
Classes: Buffer
Constant Summary collapse
- BUFFER_KEY =
:unit_time_filter_buffer
Instance Method Summary collapse
Instance Method Details
#configure(conf) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/fluent/plugin/out_unit_time_filter.rb', line 20 def configure(conf) super unless File.exist?(@filter_path) raise Fluent::ConfigError, "No such file: #{@filter_path}" end begin @filter = Object.new.instance_eval(File.read(@filter_path), @filter_path) rescue => e raise Fluent::ConfigError, "Invalid filter: #{@filter_path}: #{e}" end unless @filter.respond_to?(:call) raise Fluent::ConfigError, "`call` method not implemented in filter: #{@filter_path}" end end |
#emit(tag, es, chain) ⇒ Object
70 71 72 73 74 75 76 |
# File 'lib/fluent/plugin/out_unit_time_filter.rb', line 70 def emit(tag, es, chain) use_buffer do |buffer| buffer.resume(tag, es) end chain.next end |
#shutdown ⇒ Object
38 39 40 41 |
# File 'lib/fluent/plugin/out_unit_time_filter.rb', line 38 def shutdown super Thread.current[BUFFER_KEY] = nil end |
#use_buffer ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/fluent/plugin/out_unit_time_filter.rb', line 43 def use_buffer buf = Thread.current[BUFFER_KEY] unless buf buf = Buffer.new({ :filter => @filter, :unit_sec => @unit_sec, :prefix => @prefix, :emit_each_tag => @emit_each_tag, :pass_hash_row => @pass_hash_row, :hash_row_time_key => @hash_row_time_key, :hash_row_tag_key => @hash_row_tag_key, :log => log, }) Thread.current[BUFFER_KEY] = buf end begin yield(buf) rescue Exception => e Thread.current[BUFFER_KEY] = nil log.error(e) raise e end end |