Class: Fluent::EngineClass
Constant Summary
collapse
- MAINLOOP_SLEEP_INTERVAL =
0.3
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#add_plugin_dir(dir) ⇒ Object
-
#configure(conf) ⇒ Object
-
#emit(tag, time, record) ⇒ Object
-
#emit_array(tag, array) ⇒ Object
-
#emit_stream(tag, es) ⇒ Object
-
#flush! ⇒ Object
-
#init(system_config, supervisor_mode: false) ⇒ Object
-
#initialize ⇒ EngineClass
constructor
A new instance of EngineClass.
-
#log ⇒ Object
-
#now ⇒ Object
-
#parse_config(io, fname, basepath = Dir.pwd, v1_config = false) ⇒ Object
-
#push_log_event(tag, time, record) ⇒ Object
-
#reload_config(conf, supervisor: false) ⇒ Object
-
#run ⇒ Object
-
#run_configure(conf, dry_run: false) ⇒ Object
-
#stop ⇒ Object
-
#worker_id ⇒ Object
#deprecated_log, #msgpack_factory, #msgpack_packer, #msgpack_unpacker
Constructor Details
Returns a new instance of EngineClass.
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/fluent/engine.rb', line 33
def initialize
@root_agent = nil
@engine_stopped = false
@_worker_id = nil
@log_event_verbose = false
@suppress_config_dump = false
@without_source = false
@fluent_log_event_router = nil
@system_config = SystemConfig.new
@supervisor_mode = false
end
|
Instance Attribute Details
#root_agent ⇒ Object
Returns the value of attribute root_agent.
50
51
52
|
# File 'lib/fluent/engine.rb', line 50
def root_agent
@root_agent
end
|
#supervisor_mode ⇒ Object
Returns the value of attribute supervisor_mode.
50
51
52
|
# File 'lib/fluent/engine.rb', line 50
def supervisor_mode
@supervisor_mode
end
|
#system_config ⇒ Object
Returns the value of attribute system_config.
50
51
52
|
# File 'lib/fluent/engine.rb', line 50
def system_config
@system_config
end
|
Instance Method Details
#add_plugin_dir(dir) ⇒ Object
118
119
120
121
|
# File 'lib/fluent/engine.rb', line 118
def add_plugin_dir(dir)
$log.warn('Deprecated method: this method is going to be deleted. Use Fluent::Plugin.add_plugin_dir')
Plugin.add_plugin_dir(dir)
end
|
104
105
106
107
108
109
110
111
112
113
114
115
116
|
# File 'lib/fluent/engine.rb', line 104
def configure(conf)
@root_agent.configure(conf)
@fluent_log_event_router = FluentLogEventRouter.build(@root_agent)
if @fluent_log_event_router.emittable?
$log.enable_event(true)
end
unless @suppress_config_dump
$log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}"
end
end
|
#emit(tag, time, record) ⇒ Object
123
124
125
|
# File 'lib/fluent/engine.rb', line 123
def emit(tag, time, record)
raise "BUG: use router.emit instead of Engine.emit"
end
|
#emit_array(tag, array) ⇒ Object
127
128
129
|
# File 'lib/fluent/engine.rb', line 127
def emit_array(tag, array)
raise "BUG: use router.emit_array instead of Engine.emit_array"
end
|
#emit_stream(tag, es) ⇒ Object
131
132
133
|
# File 'lib/fluent/engine.rb', line 131
def emit_stream(tag, es)
raise "BUG: use router.emit_stream instead of Engine.emit_stream"
end
|
#flush! ⇒ Object
135
136
137
|
# File 'lib/fluent/engine.rb', line 135
def flush!
@root_agent.flush!
end
|
#init(system_config, supervisor_mode: false) ⇒ Object
#log ⇒ Object
66
67
68
|
# File 'lib/fluent/engine.rb', line 66
def log
$log
end
|
#parse_config(io, fname, basepath = Dir.pwd, v1_config = false) ⇒ Object
70
71
72
73
74
75
76
77
|
# File 'lib/fluent/engine.rb', line 70
def parse_config(io, fname, basepath = Dir.pwd, v1_config = false)
if /\.rb$/.match?(fname)
require 'fluent/config/dsl'
Config::DSL::Parser.parse(io, File.join(basepath, fname))
else
Config.parse(io, fname, basepath, v1_config)
end
end
|
#push_log_event(tag, time, record) ⇒ Object
209
210
211
|
# File 'lib/fluent/engine.rb', line 209
def push_log_event(tag, time, record)
@fluent_log_event_router.emit_event([tag, time, record])
end
|
#reload_config(conf, supervisor: false) ⇒ Object
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
|
# File 'lib/fluent/engine.rb', line 167
def reload_config(conf, supervisor: false)
new_agent = RootAgent.new(log: log, system_config: @system_config)
ret = Fluent::StaticConfigAnalysis.call(conf, workers: system_config.workers)
ret.all_plugins.each do |plugin|
if plugin.respond_to?(:reloadable_plugin?) && !plugin.reloadable_plugin?
raise Fluent::ConfigError, "Unreloadable plugin plugin: #{Fluent::Plugin.lookup_type_from_class(plugin.class)}, plugin_id: #{plugin.plugin_id}, class_name: #{plugin.class})"
end
end
old_agent, @root_agent = @root_agent, new_agent
begin
@root_agent.configure(conf)
rescue
@root_agent = old_agent
raise
end
unless @suppress_config_dump
$log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}"
end
if supervisor
old_agent.shutdown return
end
stop_phase(old_agent)
$log.info 'restart fluentd worker', worker: worker_id
start_phase(new_agent)
end
|
#run ⇒ Object
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
|
# File 'lib/fluent/engine.rb', line 144
def run
begin
$log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid, worker: worker_id
start
@fluent_log_event_router.start
$log.info "fluentd worker is now running", worker: worker_id
sleep MAINLOOP_SLEEP_INTERVAL until @engine_stopped
$log.info "fluentd worker is now stopping", worker: worker_id
rescue Exception => e
$log.error "unexpected error", error: e
$log.error_backtrace
raise
end
stop_phase(@root_agent)
end
|
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
# File 'lib/fluent/engine.rb', line 79
def run_configure(conf, dry_run: false)
configure(conf)
conf.check_not_fetched do |key, e|
parent_name, plugin_name = e.unused_in
message = if parent_name && plugin_name
"section <#{e.name}> is not used in <#{parent_name}> of #{plugin_name} plugin"
elsif parent_name
"section <#{e.name}> is not used in <#{parent_name}>"
elsif e.name != 'system' && !(@without_source && e.name == 'source')
"parameter '#{key}' in #{e.to_s.strip} is not used."
else
nil
end
next if message.nil?
if dry_run && @supervisor_mode
$log.warn :supervisor, message
elsif e.for_every_workers?
$log.warn :worker0, message
elsif e.for_this_worker?
$log.warn message
end
end
end
|
#stop ⇒ Object
204
205
206
207
|
# File 'lib/fluent/engine.rb', line 204
def stop
@engine_stopped = true
nil
end
|
#worker_id ⇒ Object
213
214
215
216
217
218
219
220
221
222
223
|
# File 'lib/fluent/engine.rb', line 213
def worker_id
if @supervisor_mode
return -1
end
return @_worker_id if @_worker_id
@_worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i
@_worker_id
end
|