Class: Fluent::Plugin::MultiOutput
Constant Summary
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
#log
Attributes inherited from Base
#under_plugin_development
Instance Method Summary
collapse
included
included
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir
Methods inherited from Base
#acquire_worker_lock, #after_shutdown?, #after_started?, #before_shutdown?, #called_in_test?, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #inspect, #multi_workers_ready?, #plugin_root_dir, #reloadable_plugin?, #shutdown?, #started?, #stopped?, #string_safe_encoding, #terminated?
#system_config, #system_config_override
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
Returns a new instance of MultiOutput.
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/fluent/plugin/multi_output.rb', line 43
def initialize
super
@outputs = []
@outputs_statically_created = false
@counter_mutex = Mutex.new
@num_errors_metrics = nil
@emit_count_metrics = nil
@emit_records_metrics = nil
@emit_size_metrics = nil
@enable_size_metrics = false
end
|
Instance Attribute Details
#outputs ⇒ Object
Returns the value of attribute outputs.
37
38
39
|
# File 'lib/fluent/plugin/multi_output.rb', line 37
def outputs
@outputs
end
|
#outputs_statically_created ⇒ Object
Returns the value of attribute outputs_statically_created.
37
38
39
|
# File 'lib/fluent/plugin/multi_output.rb', line 37
def outputs_statically_created
@outputs_statically_created
end
|
Instance Method Details
#after_shutdown ⇒ Object
166
167
168
169
|
# File 'lib/fluent/plugin/multi_output.rb', line 166
def after_shutdown
super
call_lifecycle_method(:after_shutdown, :after_shutdown?)
end
|
#after_start ⇒ Object
146
147
148
149
|
# File 'lib/fluent/plugin/multi_output.rb', line 146
def after_start
super
call_lifecycle_method(:after_start, :after_started?)
end
|
#before_shutdown ⇒ Object
156
157
158
159
|
# File 'lib/fluent/plugin/multi_output.rb', line 156
def before_shutdown
super
call_lifecycle_method(:before_shutdown, :before_shutdown?)
end
|
#call_lifecycle_method(method_name, checker_name) ⇒ Object
But when MultiOutput plugins are created dynamically (by forest plugin or others), agent cannot find sub-plugins. So child plugins’ lifecycles MUST be controlled by MultiOutput plugin itself. TODO: this hack will be removed at v2.
128
129
130
131
132
133
134
135
136
137
138
139
|
# File 'lib/fluent/plugin/multi_output.rb', line 128
def call_lifecycle_method(method_name, checker_name)
return if @outputs_statically_created
@outputs.each do |o|
begin
log.debug "calling #{method_name} on output plugin dynamically created", type: Fluent::Plugin.lookup_type_from_class(o.class), plugin_id: o.plugin_id
o.__send__(method_name) unless o.__send__(checker_name)
rescue Exception => e
log.warn "unexpected error while calling #{method_name} on output plugin dynamically created", plugin: o.class, plugin_id: o.plugin_id, error: e
log.warn_backtrace
end
end
end
|
#close ⇒ Object
171
172
173
174
|
# File 'lib/fluent/plugin/multi_output.rb', line 171
def close
super
call_lifecycle_method(:close, :closed?)
end
|
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/fluent/plugin/multi_output.rb', line 90
def configure(conf)
super
@num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "num_errors", help_text: "Number of count num errors")
@emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of count emits")
@emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of emit records")
@emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_size", help_text: "Total size of emit events")
@enable_size_metrics = !!system_config.enable_size_metrics
@stores.each do |store|
store_conf = store.corresponding_config_element
type = store_conf['@type']
unless type
raise Fluent::ConfigError, "Missing '@type' parameter in <store> section"
end
log.debug "adding store", type: type
output = Fluent::Plugin.new_output(type)
output.context_router = self.context_router
output.configure(store_conf)
@outputs << output
end
end
|
#emit_count ⇒ Object
63
64
65
|
# File 'lib/fluent/plugin/multi_output.rb', line 63
def emit_count
@emit_count_metrics.get
end
|
#emit_records ⇒ Object
71
72
73
|
# File 'lib/fluent/plugin/multi_output.rb', line 71
def emit_records
@emit_records_metrics.get
end
|
#emit_size ⇒ Object
67
68
69
|
# File 'lib/fluent/plugin/multi_output.rb', line 67
def emit_size
@emit_size_metrics.get
end
|
#emit_sync(tag, es) ⇒ Object
Also known as:
emit_events
181
182
183
184
185
186
187
188
189
190
191
|
# File 'lib/fluent/plugin/multi_output.rb', line 181
def emit_sync(tag, es)
@emit_count_metrics.inc
begin
process(tag, es)
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
rescue
@num_errors_metrics.inc
raise
end
end
|
#multi_output? ⇒ Boolean
86
87
88
|
# File 'lib/fluent/plugin/multi_output.rb', line 86
def multi_output?
true
end
|
#num_errors ⇒ Object
59
60
61
|
# File 'lib/fluent/plugin/multi_output.rb', line 59
def num_errors
@num_errors_metrics.get
end
|
#process(tag, es) ⇒ Object
39
40
41
|
# File 'lib/fluent/plugin/multi_output.rb', line 39
def process(tag, es)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end
|
#shutdown ⇒ Object
161
162
163
164
|
# File 'lib/fluent/plugin/multi_output.rb', line 161
def shutdown
super
call_lifecycle_method(:shutdown, :shutdown?)
end
|
#start ⇒ Object
141
142
143
144
|
# File 'lib/fluent/plugin/multi_output.rb', line 141
def start
super
call_lifecycle_method(:start, :started?)
end
|
#static_outputs ⇒ Object
115
116
117
118
|
# File 'lib/fluent/plugin/multi_output.rb', line 115
def static_outputs
@outputs_statically_created = true
@outputs
end
|
#statistics ⇒ Object
75
76
77
78
79
80
81
82
83
84
|
# File 'lib/fluent/plugin/multi_output.rb', line 75
def statistics
stats = {
'num_errors' => @num_errors_metrics.get,
'emit_records' => @emit_records_metrics.get,
'emit_count' => @emit_count_metrics.get,
'emit_size' => @emit_size_metrics.get,
}
{ 'multi_output' => stats }
end
|
#stop ⇒ Object
151
152
153
154
|
# File 'lib/fluent/plugin/multi_output.rb', line 151
def stop
super
call_lifecycle_method(:stop, :stopped?)
end
|
#terminate ⇒ Object
176
177
178
179
|
# File 'lib/fluent/plugin/multi_output.rb', line 176
def terminate
super
call_lifecycle_method(:terminate, :terminated?)
end
|