Class: Fluent::Plugin::DockerJournaldConcatFilter
- Inherits:
-
Filter
- Object
- Filter
- Fluent::Plugin::DockerJournaldConcatFilter
- Defined in:
- lib/fluent/plugin/filter_docker_journald_concat.rb
Defined Under Namespace
Classes: TimeoutError
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #filter_stream(tag, es) ⇒ Object
-
#initialize ⇒ DockerJournaldConcatFilter
constructor
A new instance of DockerJournaldConcatFilter.
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ DockerJournaldConcatFilter
Returns a new instance of DockerJournaldConcatFilter.
23 24 25 26 27 28 |
# File 'lib/fluent/plugin/filter_docker_journald_concat.rb', line 23 def initialize super @buffer = Hash.new {|h, k| h[k] = [] } @timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now } end |
Instance Method Details
#configure(conf) ⇒ Object
30 31 32 33 34 |
# File 'lib/fluent/plugin/filter_docker_journald_concat.rb', line 30 def configure(conf) super @separator = "" end |
#filter_stream(tag, es) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/fluent/plugin/filter_docker_journald_concat.rb', line 50 def filter_stream(tag, es) new_es = Fluent::MultiEventStream.new es.each do |time, record| if /\Afluent\.(?:trace|debug|info|warn|error|fatal)\z/ =~ tag new_es.add(time, record) next end begin flushed_es = process(tag, time, record) unless flushed_es.empty? flushed_es.each do |_time, new_record| new_es.add(time, record.merge(new_record)) end end rescue => e router.emit_error_event(tag, time, record, e) end end new_es end |
#shutdown ⇒ Object
43 44 45 46 47 48 |
# File 'lib/fluent/plugin/filter_docker_journald_concat.rb', line 43 def shutdown super @finished = true flush_shutdown_buffer end |
#start ⇒ Object
36 37 38 39 40 41 |
# File 'lib/fluent/plugin/filter_docker_journald_concat.rb', line 36 def start super @finished = false timer_execute(:filter_concat_timer, 1, &method(:on_timer)) end |