Class: LogStash::PipelinesRegistry
- Inherits:
-
Object
- Object
- LogStash::PipelinesRegistry
- Includes:
- Util::Loggable
- Defined in:
- lib/logstash/pipelines_registry.rb
Instance Attribute Summary collapse
-
#states ⇒ Object
readonly
Returns the value of attribute states.
Instance Method Summary collapse
-
#create_pipeline(pipeline_id, pipeline, &create_block) ⇒ Boolean
Execute the passed creation logic block and create a new state upon success.
-
#empty? ⇒ Boolean
True if the states collection is empty.
-
#get_pipeline(pipeline_id) ⇒ Pipeline
The pipeline object or nil if none for pipeline_id.
-
#initialize ⇒ PipelinesRegistry
constructor
A new instance of PipelinesRegistry.
- #non_running_pipelines ⇒ Hash{String=>Pipeline}
-
#reload_pipeline(pipeline_id, &reload_block) ⇒ Boolean
Execute the passed reloading logic block in the context of the reloading state and set new pipeline in state.
- #running_pipelines ⇒ Hash{String=>Pipeline}
- #running_user_defined_pipelines ⇒ Hash{String=>Pipeline}
-
#size ⇒ Fixnum
Number of items in the states collection.
-
#terminate_pipeline(pipeline_id, &stop_block) {|the| ... } ⇒ Object
Execute the passed termination logic block.
Constructor Details
#initialize ⇒ PipelinesRegistry
Returns a new instance of PipelinesRegistry.
32 33 34 35 36 37 38 39 |
# File 'lib/logstash/pipelines_registry.rb', line 32 def initialize # we leverage the semantic of the Java ConcurrentHashMap for the # compute() method which is atomic; calling compute() concurrently # will block until the other compute finishes so no mutex is necessary # for synchronizing compute calls @states = java.util.concurrent.ConcurrentHashMap.new @locks = java.util.concurrent.ConcurrentHashMap.new end |
Instance Attribute Details
#states ⇒ Object (readonly)
Returns the value of attribute states.
29 30 31 |
# File 'lib/logstash/pipelines_registry.rb', line 29 def states @states end |
Instance Method Details
#create_pipeline(pipeline_id, pipeline, &create_block) ⇒ Boolean
Execute the passed creation logic block and create a new state upon success
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/logstash/pipelines_registry.rb', line 49 def create_pipeline(pipeline_id, pipeline, &create_block) lock = get_lock(pipeline_id) lock.lock success = false state = @states.get(pipeline_id) if state if state.terminated? success = yield state.set_pipeline(pipeline) else logger.error("Attempted to create a pipeline that already exists", :pipeline_id => pipeline_id) end @states.put(pipeline_id, state) else success = yield @states.put(pipeline_id, PipelineState.new(pipeline_id, pipeline)) if success end success ensure lock.unlock end |
#empty? ⇒ Boolean
Returns true if the states collection is empty.
140 141 142 |
# File 'lib/logstash/pipelines_registry.rb', line 140 def empty? @states.isEmpty end |
#get_pipeline(pipeline_id) ⇒ Pipeline
Returns the pipeline object or nil if none for pipeline_id.
129 130 131 132 |
# File 'lib/logstash/pipelines_registry.rb', line 129 def get_pipeline(pipeline_id) state = @states.get(pipeline_id) state.nil? ? nil : state.pipeline end |
#non_running_pipelines ⇒ Hash{String=>Pipeline}
150 151 152 |
# File 'lib/logstash/pipelines_registry.rb', line 150 def non_running_pipelines select_pipelines { |state| state.terminated? } end |
#reload_pipeline(pipeline_id, &reload_block) ⇒ Boolean
Execute the passed reloading logic block in the context of the reloading state and set new pipeline in state
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/logstash/pipelines_registry.rb', line 102 def reload_pipeline(pipeline_id, &reload_block) lock = get_lock(pipeline_id) lock.lock success = false state = @states.get(pipeline_id) if state.nil? logger.error("Attempted to reload a pipeline that does not exists", :pipeline_id => pipeline_id) @states.remove(pipeline_id) else state.set_reloading(true) begin success, new_pipeline = yield state.set_pipeline(new_pipeline) ensure state.set_reloading(false) end @states.put(pipeline_id, state) end success ensure lock.unlock end |
#running_pipelines ⇒ Hash{String=>Pipeline}
145 146 147 |
# File 'lib/logstash/pipelines_registry.rb', line 145 def running_pipelines select_pipelines { |state| !state.terminated? } end |
#running_user_defined_pipelines ⇒ Hash{String=>Pipeline}
155 156 157 |
# File 'lib/logstash/pipelines_registry.rb', line 155 def running_user_defined_pipelines select_pipelines { |state | !state.terminated? && !state.pipeline.system? } end |
#size ⇒ Fixnum
Returns number of items in the states collection.
135 136 137 |
# File 'lib/logstash/pipelines_registry.rb', line 135 def size @states.size end |
#terminate_pipeline(pipeline_id, &stop_block) {|the| ... } ⇒ Object
Execute the passed termination logic block
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/logstash/pipelines_registry.rb', line 79 def terminate_pipeline(pipeline_id, &stop_block) lock = get_lock(pipeline_id) lock.lock state = @states.get(pipeline_id) if state.nil? logger.error("Attempted to terminate a pipeline that does not exists", :pipeline_id => pipeline_id) @states.remove(pipeline_id) else yield(state.pipeline) @states.put(pipeline_id, state) end ensure lock.unlock end |