Class: Fluent::Plugin::RecordDemuxOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_record_demux.rb

Constant Summary collapse

NAME =
'record_demux'

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


51
52
53
54
55
56
57
# File 'lib/fluent/plugin/out_record_demux.rb', line 51

def configure(conf)
  super

  return unless @tag.nil?

  raise Fluent::ConfigError, 'out_record_splitter: `tag` must be specified'
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


59
60
61
# File 'lib/fluent/plugin/out_record_demux.rb', line 59

def multi_workers_ready?
  true
end

#process(_tag, es) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/fluent/plugin/out_record_demux.rb', line 63

def process(_tag, es)
  es.each do |time, record|
    record_keys = record.keys - remove_keys

    shared_keys = record_keys.intersection(@shared_keys)
    shared = record.select { |key, _value| shared_keys.include?(key) }
    demux_keys = record_keys - shared_keys if !demux_keys || demux_keys.empty?

    demux_keys.each do |key|
      next unless record.key?(key)

      new_record = format(time, key, record[key], shared)

      router.emit(@tag, time, new_record)
    end
  end
end