Class: Fluent::Test::Driver::Output
Constant Summary
Constants inherited
from Base
Base::DEFAULT_TIMEOUT
Instance Attribute Summary
Attributes inherited from Base
#instance, #logs
Instance Method Summary
collapse
#feed, #feed_to_plugin
Methods inherited from BaseOwner
#configure, #emit_count, #emit_error_event, #emit_event_stream, #error_events, #event_streams, #events, #record_count
Methods inherited from Base
#break_if, #broken?, #configure, #end_if, #instance_hook_before_stopped, #instance_shutdown, #instance_start, #stop?
Constructor Details
#initialize(klass, opts: {}, &block) ⇒ Output
Returns a new instance of Output.
29
30
31
32
33
34
35
36
37
|
# File 'lib/fluent/test/driver/output.rb', line 29
def initialize(klass, opts: {}, &block)
super
raise ArgumentError, "plugin is not an instance of Fluent::Plugin::Output" unless @instance.is_a? Fluent::Plugin::Output
@flush_buffer_at_cleanup = nil
@wait_flush_completion = nil
@force_flush_retry = nil
@format_hook = nil
@format_results = []
end
|
Instance Method Details
#flush ⇒ Object
63
64
65
66
|
# File 'lib/fluent/test/driver/output.rb', line 63
def flush
@instance.force_flush
wait_flush_completion if @wait_flush_completion
end
|
59
60
61
|
# File 'lib/fluent/test/driver/output.rb', line 59
def formatted
@format_results
end
|
#instance_hook_after_started ⇒ Object
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# File 'lib/fluent/test/driver/output.rb', line 83
def instance_hook_after_started
super
if @instance.instance_eval{ @custom_format }
@format_hook = format_hook = ->(result){ @format_results << result }
m = Module.new do
define_method(:format) do |tag, time, record|
result = super(tag, time, record)
format_hook.call(result)
result
end
end
@instance.singleton_class.prepend m
end
end
|
#run(flush: true, wait_flush_completion: true, force_flush_retry: false, **kwargs, &block) ⇒ Object
39
40
41
42
43
44
|
# File 'lib/fluent/test/driver/output.rb', line 39
def run(flush: true, wait_flush_completion: true, force_flush_retry: false, **kwargs, &block)
@flush_buffer_at_cleanup = flush
@wait_flush_completion = wait_flush_completion
@force_flush_retry = force_flush_retry
super(**kwargs, &block)
end
|
#run_actual(**kwargs, &block) ⇒ Object
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/fluent/test/driver/output.rb', line 46
def run_actual(**kwargs, &block)
if @force_flush_retry
@instance.retry_for_error_chunk = true
end
val = super(**kwargs, &block)
if @flush_buffer_at_cleanup
self.flush
end
val
ensure
@instance.retry_for_error_chunk = false
end
|
#wait_flush_completion ⇒ Object
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
# File 'lib/fluent/test/driver/output.rb', line 68
def wait_flush_completion
buffer_queue = ->(){ @instance.buffer && @instance.buffer.queue.size > 0 }
dequeued_chunks = ->(){
@instance.dequeued_chunks_mutex &&
@instance.dequeued_chunks &&
@instance.dequeued_chunks_mutex.synchronize{ @instance.dequeued_chunks.size > 0 }
}
Timeout.timeout(10) do
while buffer_queue.call || dequeued_chunks.call
sleep 0.1
end
end
end
|