Class: Fluent::Plugin::CopyOutput

Inherits:
MultiOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_copy.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeCopyOutput

Returns a new instance of CopyOutput.



32
33
34
35
36
# File 'lib/fluent/plugin/out_copy.rb', line 32

def initialize
  super
  @ignore_errors = []
  @ignore_if_prev_successes = []
end

Instance Attribute Details

#ignore_errorsObject (readonly)

Returns the value of attribute ignore_errors.



30
31
32
# File 'lib/fluent/plugin/out_copy.rb', line 30

def ignore_errors
  @ignore_errors
end

#ignore_if_prev_successesObject (readonly)

Returns the value of attribute ignore_if_prev_successes.



30
31
32
# File 'lib/fluent/plugin/out_copy.rb', line 30

def ignore_if_prev_successes
  @ignore_if_prev_successes
end

Instance Method Details

#configure(conf) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/fluent/plugin/out_copy.rb', line 38

def configure(conf)
  super

  @copy_proc = gen_copy_proc
  @stores.each_with_index { |store, i|
    if i == 0 && store.arg.include?('ignore_if_prev_success')
      raise Fluent::ConfigError, "ignore_if_prev_success must specify 2nd or later <store> directives"
    end
    @ignore_errors << (store.arg.include?('ignore_error'))
    @ignore_if_prev_successes << (store.arg.include?('ignore_if_prev_success'))
  }
  if @ignore_errors.uniq.size == 1 && @ignore_errors.include?(true) && !@ignore_if_prev_successes.include?(true)
    log.warn "ignore_errors are specified in all <store>, but ignore_if_prev_success is not specified. Is this intended?"
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/fluent/plugin/out_copy.rb', line 54

def multi_workers_ready?
  true
end

#process(tag, es) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/fluent/plugin/out_copy.rb', line 58

def process(tag, es)
  unless es.repeatable?
    m = Fluent::MultiEventStream.new
    es.each {|time,record|
      m.add(time, record)
    }
    es = m
  end
  success = Array.new(outputs.size)
  outputs.each_with_index do |output, i|
    begin
      if i > 0 && success[i - 1] && @ignore_if_prev_successes[i]
        log.debug "ignore copy because prev_success in #{output.plugin_id}", index: i
      else
        output.emit_events(tag, @copy_proc ? @copy_proc.call(es) : es)
        success[i] = true
      end
    rescue => e
      if @ignore_errors[i]
        log.error "ignore emit error in #{output.plugin_id}", error: e
      else
        raise e
      end
    end
  end
end