23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
# File 'lib/logstash/pipeline_action/reload.rb', line 23
def execute(agent, pipelines_registry)
old_pipeline = pipelines_registry.get_pipeline(pipeline_id)
if old_pipeline.nil?
return LogStash::ConvergeResult::FailedAction.new("Cannot reload pipeline, because the pipeline does not exist")
end
if !old_pipeline.reloadable?
return LogStash::ConvergeResult::FailedAction.new("Cannot reload pipeline, because the existing pipeline is not reloadable")
end
java_exec = @pipeline_config.settings.get_value("pipeline.java_execution")
begin
pipeline_validator = java_exec ? LogStash::JavaBasePipeline.new(@pipeline_config, nil, logger, nil) : LogStash::BasePipeline.new(@pipeline_config)
rescue => e
return LogStash::ConvergeResult::FailedAction.from_exception(e)
end
if !pipeline_validator.reloadable?
return LogStash::ConvergeResult::FailedAction.new("Cannot reload pipeline, because the new pipeline is not reloadable")
end
logger.info("Reloading pipeline", "pipeline.id" => pipeline_id)
success = pipelines_registry.reload_pipeline(pipeline_id) do
old_pipeline.shutdown { LogStash::ShutdownWatcher.start(old_pipeline) }
old_pipeline.thread.join
new_pipeline = java_exec ? LogStash::JavaPipeline.new(@pipeline_config, @metric, agent) : LogStash::Pipeline.new(@pipeline_config, @metric, agent)
success = new_pipeline.start
[success, new_pipeline]
end
LogStash::ConvergeResult::ActionResult.create(self, success)
end
|