Class: Fluent::Plugin::DockerJournaldConcatFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_docker_journald_concat.rb

Defined Under Namespace

Classes: TimeoutError

Instance Method Summary collapse

Constructor Details

#initializeDockerJournaldConcatFilter

Returns a new instance of DockerJournaldConcatFilter.



23
24
25
26
27
28
# File 'lib/fluent/plugin/filter_docker_journald_concat.rb', line 23

def initialize
  super

  @buffer = Hash.new {|h, k| h[k] = [] }
  @timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now }
end

Instance Method Details

#configure(conf) ⇒ Object



30
31
32
33
34
# File 'lib/fluent/plugin/filter_docker_journald_concat.rb', line 30

def configure(conf)
  super

  @separator = ""
end

#filter_stream(tag, es) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/fluent/plugin/filter_docker_journald_concat.rb', line 50

def filter_stream(tag, es)
  new_es = Fluent::MultiEventStream.new
  es.each do |time, record|
    if /\Afluent\.(?:trace|debug|info|warn|error|fatal)\z/ =~ tag
      new_es.add(time, record)
      next
    end
    begin
      flushed_es = process(tag, time, record)
      unless flushed_es.empty?
        flushed_es.each do |_time, new_record|
          new_es.add(time, record.merge(new_record))
        end
      end
    rescue => e
      router.emit_error_event(tag, time, record, e)
    end
  end
  new_es
end

#shutdownObject



43
44
45
46
47
48
# File 'lib/fluent/plugin/filter_docker_journald_concat.rb', line 43

def shutdown
  super

  @finished = true
  flush_shutdown_buffer
end

#startObject



36
37
38
39
40
41
# File 'lib/fluent/plugin/filter_docker_journald_concat.rb', line 36

def start
  super

  @finished = false
  timer_execute(:filter_concat_timer, 1, &method(:on_timer))
end