Class: LogStash::PipelineAction::Reload
- Includes:
- Util::Loggable
- Defined in:
- lib/logstash/pipeline_action/reload.rb
Instance Method Summary collapse
- #execute(agent, pipelines_registry) ⇒ Object
-
#initialize(pipeline_config, metric) ⇒ Reload
constructor
A new instance of Reload.
- #pipeline_id ⇒ Object
- #to_s ⇒ Object
Methods inherited from Base
#<=>, #execution_priority, #inspect
Constructor Details
#initialize(pipeline_config, metric) ⇒ Reload
Returns a new instance of Reload.
10 11 12 13 |
# File 'lib/logstash/pipeline_action/reload.rb', line 10 def initialize(pipeline_config, metric) @pipeline_config = pipeline_config @metric = metric end |
Instance Method Details
#execute(agent, pipelines_registry) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/logstash/pipeline_action/reload.rb', line 23 def execute(agent, pipelines_registry) old_pipeline = pipelines_registry.get_pipeline(pipeline_id) if old_pipeline.nil? return LogStash::ConvergeResult::FailedAction.new("Cannot reload pipeline, because the pipeline does not exist") end if !old_pipeline.reloadable? return LogStash::ConvergeResult::FailedAction.new("Cannot reload pipeline, because the existing pipeline is not reloadable") end begin pipeline_validator = if @pipeline_config.settings.get_value("pipeline.java_execution") LogStash::JavaBasePipeline.new(@pipeline_config, nil, logger, nil) else agent.exclusive do # The Ruby pipeline initialization is not thread safe because of the module level # shared state in LogsStash::Config::AST. When using multiple pipelines this gets # executed simultaneously in different threads and we need to synchronize this initialization. LogStash::BasePipeline.new(@pipeline_config) end end rescue => e return LogStash::ConvergeResult::FailedAction.from_exception(e) end if !pipeline_validator.reloadable? return LogStash::ConvergeResult::FailedAction.new("Cannot reload pipeline, because the new pipeline is not reloadable") end logger.info("Reloading pipeline", "pipeline.id" => pipeline_id) success = pipelines_registry.reload_pipeline(pipeline_id) do # important NOT to explicitly return from block here # the block must emit a success boolean value # First shutdown old pipeline old_pipeline.shutdown { LogStash::ShutdownWatcher.start(old_pipeline) } old_pipeline.thread.join # Then create a new pipeline new_pipeline = if @pipeline_config.settings.get_value("pipeline.java_execution") LogStash::JavaPipeline.new(@pipeline_config, @metric, agent) else agent.exclusive do # The Ruby pipeline initialization is not thread safe because of the module level # shared state in LogsStash::Config::AST. When using multiple pipelines this gets # executed simultaneously in different threads and we need to synchronize this initialization. LogStash::Pipeline.new(@pipeline_config, @metric, agent) end end success = new_pipeline.start # block until the pipeline is correctly started or crashed # return success and new_pipeline to registry reload_pipeline [success, new_pipeline] end LogStash::ConvergeResult::ActionResult.create(self, success) end |
#pipeline_id ⇒ Object
15 16 17 |
# File 'lib/logstash/pipeline_action/reload.rb', line 15 def pipeline_id @pipeline_config.pipeline_id end |
#to_s ⇒ Object
19 20 21 |
# File 'lib/logstash/pipeline_action/reload.rb', line 19 def to_s "PipelineAction::Reload<#{pipeline_id}>" end |