Class: LogStash::PipelinesRegistry
- Inherits:
-
Object
- Object
- LogStash::PipelinesRegistry
- Includes:
- Util::Loggable
- Defined in:
- lib/logstash/pipelines_registry.rb
Instance Attribute Summary collapse
-
#states ⇒ Object
readonly
Returns the value of attribute states.
Instance Method Summary collapse
-
#create_pipeline(pipeline_id, pipeline, &create_block) ⇒ Boolean
Execute the passed creation logic block and create a new state upon success.
-
#empty? ⇒ Boolean
True if the states collection is empty.
-
#get_pipeline(pipeline_id) ⇒ Pipeline
The pipeline object or nil if none for pipeline_id.
-
#initialize ⇒ PipelinesRegistry
constructor
A new instance of PipelinesRegistry.
- #non_running_pipelines ⇒ Hash{String=>Pipeline}
-
#reload_pipeline(pipeline_id, &reload_block) ⇒ Boolean
Execute the passed reloading logic block in the context of the reloading state and set new pipeline in state.
- #running_pipelines ⇒ Hash{String=>Pipeline}
- #running_user_defined_pipelines ⇒ Hash{String=>Pipeline}
-
#size ⇒ Fixnum
Number of items in the states collection.
-
#terminate_pipeline(pipeline_id, &stop_block) {|the| ... } ⇒ Object
Execute the passed termination logic block.
Constructor Details
#initialize ⇒ PipelinesRegistry
Returns a new instance of PipelinesRegistry.
32 33 34 35 36 37 38 |
# File 'lib/logstash/pipelines_registry.rb', line 32 def initialize # we leverage the semantic of the Java ConcurrentHashMap for the # compute() method which is atomic; calling compute() concurrently # will block until the other compute finishes so no mutex is necessary # for synchronizing compute calls @states = java.util.concurrent.ConcurrentHashMap.new end |
Instance Attribute Details
#states ⇒ Object (readonly)
Returns the value of attribute states.
29 30 31 |
# File 'lib/logstash/pipelines_registry.rb', line 29 def states @states end |
Instance Method Details
#create_pipeline(pipeline_id, pipeline, &create_block) ⇒ Boolean
Execute the passed creation logic block and create a new state upon success
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/logstash/pipelines_registry.rb', line 48 def create_pipeline(pipeline_id, pipeline, &create_block) success = false @states.compute(pipeline_id) do |_, state| if state if state.terminated? success = yield state.set_pipeline(pipeline) else logger.error("Attempted to create a pipeline that already exists", :pipeline_id => pipeline_id) end state else success = yield success ? PipelineState.new(pipeline_id, pipeline) : nil end end success end |
#empty? ⇒ Boolean
Returns true if the states collection is empty.
128 129 130 |
# File 'lib/logstash/pipelines_registry.rb', line 128 def empty? @states.isEmpty end |
#get_pipeline(pipeline_id) ⇒ Pipeline
Returns the pipeline object or nil if none for pipeline_id.
117 118 119 120 |
# File 'lib/logstash/pipelines_registry.rb', line 117 def get_pipeline(pipeline_id) state = @states.get(pipeline_id) state.nil? ? nil : state.pipeline end |
#non_running_pipelines ⇒ Hash{String=>Pipeline}
138 139 140 |
# File 'lib/logstash/pipelines_registry.rb', line 138 def non_running_pipelines select_pipelines { |state| state.terminated? } end |
#reload_pipeline(pipeline_id, &reload_block) ⇒ Boolean
Execute the passed reloading logic block in the context of the reloading state and set new pipeline in state
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/logstash/pipelines_registry.rb', line 93 def reload_pipeline(pipeline_id, &reload_block) success = false @states.compute(pipeline_id) do |_, state| if state.nil? logger.error("Attempted to reload a pipeline that does not exists", :pipeline_id => pipeline_id) nil else state.set_reloading(true) begin success, new_pipeline = yield state.set_pipeline(new_pipeline) ensure state.set_reloading(false) end state end end success end |
#running_pipelines ⇒ Hash{String=>Pipeline}
133 134 135 |
# File 'lib/logstash/pipelines_registry.rb', line 133 def running_pipelines select_pipelines { |state| !state.terminated? } end |
#running_user_defined_pipelines ⇒ Hash{String=>Pipeline}
143 144 145 |
# File 'lib/logstash/pipelines_registry.rb', line 143 def running_user_defined_pipelines select_pipelines { |state | !state.terminated? && !state.pipeline.system? } end |
#size ⇒ Fixnum
Returns number of items in the states collection.
123 124 125 |
# File 'lib/logstash/pipelines_registry.rb', line 123 def size @states.size end |
#terminate_pipeline(pipeline_id, &stop_block) {|the| ... } ⇒ Object
Execute the passed termination logic block
74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/logstash/pipelines_registry.rb', line 74 def terminate_pipeline(pipeline_id, &stop_block) @states.compute(pipeline_id) do |_, state| if state.nil? logger.error("Attempted to terminate a pipeline that does not exists", :pipeline_id => pipeline_id) nil else yield(state.pipeline) state end end end |