Class: Fluent::Test::Driver::Output

Inherits:
BaseOwner show all
Includes:
EventFeeder
Defined in:
lib/fluent/test/driver/output.rb

Constant Summary

Constants inherited from Base

Base::DEFAULT_TIMEOUT

Instance Attribute Summary

Attributes inherited from Base

#instance, #logs

Instance Method Summary collapse

Methods included from EventFeeder

#feed, #feed_to_plugin

Methods inherited from BaseOwner

#configure, #emit_count, #emit_error_event, #emit_event_stream, #error_events, #event_streams, #events, #record_count

Methods inherited from Base

#break_if, #broken?, #configure, #end_if, #instance_hook_before_stopped, #instance_shutdown, #instance_start, #stop?

Constructor Details

#initialize(klass, opts: {}, &block) ⇒ Output

Returns a new instance of Output.

Raises:

  • (ArgumentError)


29
30
31
32
33
34
35
36
37
# File 'lib/fluent/test/driver/output.rb', line 29

def initialize(klass, opts: {}, &block)
  super
  raise ArgumentError, "plugin is not an instance of Fluent::Plugin::Output" unless @instance.is_a? Fluent::Plugin::Output
  @flush_buffer_at_cleanup = nil
  @wait_flush_completion = nil
  @force_flush_retry = nil
  @format_hook = nil
  @format_results = []
end

Instance Method Details

#flushObject



63
64
65
66
# File 'lib/fluent/test/driver/output.rb', line 63

def flush
  @instance.force_flush
  wait_flush_completion if @wait_flush_completion
end

#formattedObject



59
60
61
# File 'lib/fluent/test/driver/output.rb', line 59

def formatted
  @format_results
end

#instance_hook_after_startedObject



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/fluent/test/driver/output.rb', line 83

def instance_hook_after_started
  super

  # it's decided after #start whether output plugin instances use @custom_format or not.
  if @instance.instance_eval{ @custom_format }
    @format_hook = format_hook = ->(result){ @format_results << result }
    m = Module.new do
      define_method(:format) do |tag, time, record|
        result = super(tag, time, record)
        format_hook.call(result)
        result
      end
    end
    @instance.singleton_class.prepend m
  end
end

#run(flush: true, wait_flush_completion: true, force_flush_retry: false, **kwargs, &block) ⇒ Object



39
40
41
42
43
44
# File 'lib/fluent/test/driver/output.rb', line 39

def run(flush: true, wait_flush_completion: true, force_flush_retry: false, **kwargs, &block)
  @flush_buffer_at_cleanup = flush
  @wait_flush_completion = wait_flush_completion
  @force_flush_retry = force_flush_retry
  super(**kwargs, &block)
end

#run_actual(**kwargs, &block) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/fluent/test/driver/output.rb', line 46

def run_actual(**kwargs, &block)
  if @force_flush_retry
    @instance.retry_for_error_chunk = true
  end
  val = super(**kwargs, &block)
  if @flush_buffer_at_cleanup
    self.flush
  end
  val
ensure
  @instance.retry_for_error_chunk = false
end

#wait_flush_completionObject



68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/fluent/test/driver/output.rb', line 68

def wait_flush_completion
  buffer_queue = ->(){ @instance.buffer && @instance.buffer.queue.size > 0 }
  dequeued_chunks = ->(){
    @instance.dequeued_chunks_mutex &&
    @instance.dequeued_chunks &&
    @instance.dequeued_chunks_mutex.synchronize{ @instance.dequeued_chunks.size > 0 }
  }

  Timeout.timeout(10) do
    while buffer_queue.call || dequeued_chunks.call
      sleep 0.1
    end
  end
end