Class: Fluent::Plugin::SplitBySizeFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_split_by_size.rb

Defined Under Namespace

Classes: KeyNotFoundError, SkipRecordError

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



50
51
52
53
# File 'lib/fluent/plugin/filter_split_by_size.rb', line 50

def configure(conf)
  super
  @max_record_size = @max_event_size - 200 # Allow space to add the id field
end

#filter_stream(tag, es) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/fluent/plugin/filter_split_by_size.rb', line 55

def filter_stream(tag, es)
  new_es = MultiEventStream.new
  es.each { |time, record|
    begin
      raise KeyNotFoundError.new(@id_field, record) if record[@id_field].nil?
      id = record.delete(@id_field)
      records = split_event(record)
      records.each { |rec|
        rec[@id_field] = id
        new_es.add(time, rec)
      }
    rescue => e
      router.emit_error_event(tag, time, record, e)
    end
  }
  new_es
end

#size_of_values(record) ⇒ Object



73
74
75
# File 'lib/fluent/plugin/filter_split_by_size.rb', line 73

def size_of_values(record)
  Yajl.dump(record).bytesize
end

#split_event(record) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/fluent/plugin/filter_split_by_size.rb', line 77

def split_event(record)
  records = []
  if size_of_values(record) > @max_record_size
    if record.keys.count > 1
      split_records = record.split_into(2)
      split_records.each { |rec|
        records = records + split_event(rec)
      }
    else
      log.warn('Key/Value pair is too large: "%s:%s". Max size is: %s, dropping!' % [record.keys[0], record.values[0], @max_record_size])
      return []
    end
  else
    records.push(record)
  end
  records
end