Class: Fluent::EngineClass
Constant Summary
collapse
- MAINLOOP_SLEEP_INTERVAL =
0.3
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#add_plugin_dir(dir) ⇒ Object
-
#cancel_source_only! ⇒ Object
-
#configure(conf) ⇒ Object
-
#emit(tag, time, record) ⇒ Object
-
#emit_array(tag, array) ⇒ Object
-
#emit_stream(tag, es) ⇒ Object
-
#flush! ⇒ Object
-
#init(system_config, supervisor_mode: false, start_in_parallel: false) ⇒ Object
-
#initialize ⇒ EngineClass
constructor
A new instance of EngineClass.
-
#log ⇒ Object
-
#now ⇒ Object
-
#parse_config(io, fname, basepath = Dir.pwd, v1_config = false) ⇒ Object
-
#push_log_event(tag, time, record) ⇒ Object
-
#reload_config(conf, supervisor: false) ⇒ Object
-
#run ⇒ Object
-
#run_configure(conf, dry_run: false) ⇒ Object
-
#stop ⇒ Object
-
#worker_id ⇒ Object
#deprecated_log, #msgpack_factory, #msgpack_packer, #msgpack_unpacker
Constructor Details
Returns a new instance of EngineClass.
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
# File 'lib/fluent/engine.rb', line 33
def initialize
@root_agent = nil
@engine_stopped = false
@_worker_id = nil
@log_event_verbose = false
@suppress_config_dump = false
@without_source = false
@fluent_log_event_router = nil
@system_config = SystemConfig.new
@supervisor_mode = false
@root_agent_mutex = Mutex.new
end
|
Instance Attribute Details
#root_agent ⇒ Object
Returns the value of attribute root_agent.
52
53
54
|
# File 'lib/fluent/engine.rb', line 52
def root_agent
@root_agent
end
|
#supervisor_mode ⇒ Object
Returns the value of attribute supervisor_mode.
52
53
54
|
# File 'lib/fluent/engine.rb', line 52
def supervisor_mode
@supervisor_mode
end
|
#system_config ⇒ Object
Returns the value of attribute system_config.
52
53
54
|
# File 'lib/fluent/engine.rb', line 52
def system_config
@system_config
end
|
Instance Method Details
#add_plugin_dir(dir) ⇒ Object
120
121
122
123
|
# File 'lib/fluent/engine.rb', line 120
def add_plugin_dir(dir)
$log.warn('Deprecated method: this method is going to be deleted. Use Fluent::Plugin.add_plugin_dir')
Plugin.add_plugin_dir(dir)
end
|
#cancel_source_only! ⇒ Object
143
144
145
146
147
|
# File 'lib/fluent/engine.rb', line 143
def cancel_source_only!
@root_agent_mutex.synchronize do
@root_agent.cancel_source_only!
end
end
|
106
107
108
109
110
111
112
113
114
115
116
117
118
|
# File 'lib/fluent/engine.rb', line 106
def configure(conf)
@root_agent.configure(conf)
@fluent_log_event_router = FluentLogEventRouter.build(@root_agent)
if @fluent_log_event_router.emittable?
$log.enable_event(true)
end
unless @suppress_config_dump
$log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}"
end
end
|
#emit(tag, time, record) ⇒ Object
125
126
127
|
# File 'lib/fluent/engine.rb', line 125
def emit(tag, time, record)
raise "BUG: use router.emit instead of Engine.emit"
end
|
#emit_array(tag, array) ⇒ Object
129
130
131
|
# File 'lib/fluent/engine.rb', line 129
def emit_array(tag, array)
raise "BUG: use router.emit_array instead of Engine.emit_array"
end
|
#emit_stream(tag, es) ⇒ Object
133
134
135
|
# File 'lib/fluent/engine.rb', line 133
def emit_stream(tag, es)
raise "BUG: use router.emit_stream instead of Engine.emit_stream"
end
|
#flush! ⇒ Object
137
138
139
140
141
|
# File 'lib/fluent/engine.rb', line 137
def flush!
@root_agent_mutex.synchronize do
@root_agent.flush!
end
end
|
#init(system_config, supervisor_mode: false, start_in_parallel: false) ⇒ Object
54
55
56
57
58
59
60
61
62
63
64
65
66
|
# File 'lib/fluent/engine.rb', line 54
def init(system_config, supervisor_mode: false, start_in_parallel: false)
@system_config = system_config
@supervisor_mode = supervisor_mode
@suppress_config_dump = system_config.suppress_config_dump unless system_config.suppress_config_dump.nil?
@without_source = system_config.without_source unless system_config.without_source.nil?
@log_event_verbose = system_config.log_event_verbose unless system_config.log_event_verbose.nil?
@root_agent = RootAgent.new(log: log, system_config: @system_config, start_in_parallel: start_in_parallel)
self
end
|
#log ⇒ Object
68
69
70
|
# File 'lib/fluent/engine.rb', line 68
def log
$log
end
|
#parse_config(io, fname, basepath = Dir.pwd, v1_config = false) ⇒ Object
72
73
74
75
76
77
78
79
|
# File 'lib/fluent/engine.rb', line 72
def parse_config(io, fname, basepath = Dir.pwd, v1_config = false)
if /\.rb$/.match?(fname)
require 'fluent/config/dsl'
Config::DSL::Parser.parse(io, File.join(basepath, fname))
else
Config.parse(io, fname, basepath, v1_config)
end
end
|
#push_log_event(tag, time, record) ⇒ Object
225
226
227
|
# File 'lib/fluent/engine.rb', line 225
def push_log_event(tag, time, record)
@fluent_log_event_router.emit_event([tag, time, record])
end
|
#reload_config(conf, supervisor: false) ⇒ Object
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
|
# File 'lib/fluent/engine.rb', line 181
def reload_config(conf, supervisor: false)
@root_agent_mutex.synchronize do
new_agent = RootAgent.new(log: log, system_config: @system_config)
ret = Fluent::StaticConfigAnalysis.call(conf, workers: system_config.workers)
ret.all_plugins.each do |plugin|
if plugin.respond_to?(:reloadable_plugin?) && !plugin.reloadable_plugin?
raise Fluent::ConfigError, "Unreloadable plugin plugin: #{Fluent::Plugin.lookup_type_from_class(plugin.class)}, plugin_id: #{plugin.plugin_id}, class_name: #{plugin.class})"
end
end
old_agent, @root_agent = @root_agent, new_agent
begin
@root_agent.configure(conf)
rescue
@root_agent = old_agent
raise
end
unless @suppress_config_dump
$log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}"
end
if supervisor
old_agent.shutdown return
end
stop_phase(old_agent)
$log.info 'restart fluentd worker', worker: worker_id
start_phase(new_agent)
end
end
|
#run ⇒ Object
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
# File 'lib/fluent/engine.rb', line 154
def run
begin
$log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid, worker: worker_id
@root_agent_mutex.synchronize do
start
end
@fluent_log_event_router.start
$log.info "fluentd worker is now running", worker: worker_id
sleep MAINLOOP_SLEEP_INTERVAL until @engine_stopped
$log.info "fluentd worker is now stopping", worker: worker_id
rescue Exception => e
$log.error "unexpected error", error: e
$log.error_backtrace
raise
end
@root_agent_mutex.synchronize do
stop_phase(@root_agent)
end
end
|
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
# File 'lib/fluent/engine.rb', line 81
def run_configure(conf, dry_run: false)
configure(conf)
conf.check_not_fetched do |key, e|
parent_name, plugin_name = e.unused_in
message = if parent_name && plugin_name
"section <#{e.name}> is not used in <#{parent_name}> of #{plugin_name} plugin"
elsif parent_name
"section <#{e.name}> is not used in <#{parent_name}>"
elsif e.name != 'system' && !(@without_source && e.name == 'source')
"parameter '#{key}' in #{e.to_s.strip} is not used."
else
nil
end
next if message.nil?
if dry_run && @supervisor_mode
$log.warn :supervisor, message
elsif e.for_every_workers?
$log.warn :worker0, message
elsif e.for_this_worker?
$log.warn message
end
end
end
|
#stop ⇒ Object
220
221
222
223
|
# File 'lib/fluent/engine.rb', line 220
def stop
@engine_stopped = true
nil
end
|
#worker_id ⇒ Object
229
230
231
232
233
234
235
236
237
238
239
|
# File 'lib/fluent/engine.rb', line 229
def worker_id
if @supervisor_mode
return -1
end
return @_worker_id if @_worker_id
@_worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i
@_worker_id
end
|