Class: Fluent::Plugin::CopyOutput
- Inherits:
-
MultiOutput
- Object
- Fluent::Plugin::CopyOutput
- Defined in:
- lib/fluent/plugin/out_copy.rb
Instance Attribute Summary collapse
-
#ignore_errors ⇒ Object
readonly
Returns the value of attribute ignore_errors.
-
#ignore_if_prev_successes ⇒ Object
readonly
Returns the value of attribute ignore_if_prev_successes.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ CopyOutput
constructor
A new instance of CopyOutput.
- #multi_workers_ready? ⇒ Boolean
- #process(tag, es) ⇒ Object
Constructor Details
#initialize ⇒ CopyOutput
Returns a new instance of CopyOutput.
32 33 34 35 36 |
# File 'lib/fluent/plugin/out_copy.rb', line 32 def initialize super @ignore_errors = [] @ignore_if_prev_successes = [] end |
Instance Attribute Details
#ignore_errors ⇒ Object (readonly)
Returns the value of attribute ignore_errors.
30 31 32 |
# File 'lib/fluent/plugin/out_copy.rb', line 30 def ignore_errors @ignore_errors end |
#ignore_if_prev_successes ⇒ Object (readonly)
Returns the value of attribute ignore_if_prev_successes.
30 31 32 |
# File 'lib/fluent/plugin/out_copy.rb', line 30 def ignore_if_prev_successes @ignore_if_prev_successes end |
Instance Method Details
#configure(conf) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/fluent/plugin/out_copy.rb', line 38 def configure(conf) super @copy_proc = gen_copy_proc @stores.each_with_index { |store, i| if i == 0 && store.arg.include?('ignore_if_prev_success') raise Fluent::ConfigError, "ignore_if_prev_success must specify 2nd or later <store> directives" end @ignore_errors << (store.arg.include?('ignore_error')) @ignore_if_prev_successes << (store.arg.include?('ignore_if_prev_success')) } if @ignore_errors.uniq.size == 1 && @ignore_errors.include?(true) && !@ignore_if_prev_successes.include?(true) log.warn "ignore_errors are specified in all <store>, but ignore_if_prev_success is not specified. Is this intended?" end end |
#multi_workers_ready? ⇒ Boolean
54 55 56 |
# File 'lib/fluent/plugin/out_copy.rb', line 54 def multi_workers_ready? true end |
#process(tag, es) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/fluent/plugin/out_copy.rb', line 58 def process(tag, es) unless es.repeatable? m = Fluent::MultiEventStream.new es.each {|time,record| m.add(time, record) } es = m end success = Array.new(outputs.size) outputs.each_with_index do |output, i| begin if i > 0 && success[i - 1] && @ignore_if_prev_successes[i] log.debug "ignore copy because prev_success in #{output.plugin_id}", index: i else output.emit_events(tag, @copy_proc ? @copy_proc.call(es) : es) success[i] = true end rescue => e if @ignore_errors[i] log.error "ignore emit error in #{output.plugin_id}", error: e else raise e end end end end |