Class: LogStash::BasePipeline
- Inherits:
-
Object
- Object
- LogStash::BasePipeline
- Includes:
- Util::Loggable
- Defined in:
- lib/logstash/pipeline.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#config_hash ⇒ Object
readonly
Returns the value of attribute config_hash.
-
#config_str ⇒ Object
readonly
Returns the value of attribute config_str.
-
#filters ⇒ Object
readonly
Returns the value of attribute filters.
-
#inputs ⇒ Object
readonly
Returns the value of attribute inputs.
-
#outputs ⇒ Object
readonly
Returns the value of attribute outputs.
-
#pipeline_id ⇒ Object
readonly
Returns the value of attribute pipeline_id.
Instance Method Summary collapse
-
#initialize(config_str, settings = SETTINGS) ⇒ BasePipeline
constructor
A new instance of BasePipeline.
- #non_reloadable_plugins ⇒ Object
- #plugin(plugin_type, name, *args) ⇒ Object
- #reloadable? ⇒ Boolean
Methods included from Util::Loggable
included, #logger, #slow_logger
Constructor Details
#initialize(config_str, settings = SETTINGS) ⇒ BasePipeline
Returns a new instance of BasePipeline.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/logstash/pipeline.rb', line 29 def initialize(config_str, settings = SETTINGS) @logger = self.logger @config_str = config_str @config_hash = Digest::SHA1.hexdigest(@config_str) # Every time #plugin is invoked this is incremented to give each plugin # a unique id when auto-generating plugin ids @plugin_counter ||= 0 @pipeline_id = settings.get_value("pipeline.id") || self.object_id # A list of plugins indexed by id @plugins_by_id = {} @inputs = nil @filters = nil @outputs = nil grammar = LogStashConfigParser.new parsed_config = grammar.parse(config_str) raise(ConfigurationError, grammar.failure_reason) if parsed_config.nil? config_code = parsed_config.compile # config_code = BasePipeline.compileConfig(config_str) if settings.get_value("config.debug") && @logger.debug? @logger.debug("Compiled pipeline code", :code => config_code) end # Evaluate the config compiled code that will initialize all the plugins and define the # filter and output methods. begin eval(config_code) rescue => e raise e end end |
Instance Attribute Details
#config_hash ⇒ Object (readonly)
Returns the value of attribute config_hash.
27 28 29 |
# File 'lib/logstash/pipeline.rb', line 27 def config_hash @config_hash end |
#config_str ⇒ Object (readonly)
Returns the value of attribute config_str.
27 28 29 |
# File 'lib/logstash/pipeline.rb', line 27 def config_str @config_str end |
#filters ⇒ Object (readonly)
Returns the value of attribute filters.
27 28 29 |
# File 'lib/logstash/pipeline.rb', line 27 def filters @filters end |
#inputs ⇒ Object (readonly)
Returns the value of attribute inputs.
27 28 29 |
# File 'lib/logstash/pipeline.rb', line 27 def inputs @inputs end |
#outputs ⇒ Object (readonly)
Returns the value of attribute outputs.
27 28 29 |
# File 'lib/logstash/pipeline.rb', line 27 def outputs @outputs end |
#pipeline_id ⇒ Object (readonly)
Returns the value of attribute pipeline_id.
27 28 29 |
# File 'lib/logstash/pipeline.rb', line 27 def pipeline_id @pipeline_id end |
Instance Method Details
#non_reloadable_plugins ⇒ Object
105 106 107 |
# File 'lib/logstash/pipeline.rb', line 105 def non_reloadable_plugins (inputs + filters + outputs).select { |plugin| !plugin.reloadable? } end |
#plugin(plugin_type, name, *args) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/logstash/pipeline.rb', line 66 def plugin(plugin_type, name, *args) @plugin_counter += 1 # Collapse the array of arguments into a single merged hash args = args.reduce({}, &:merge) id = if args["id"].nil? || args["id"].empty? args["id"] = "#{@config_hash}-#{@plugin_counter}" else args["id"] end raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id] @plugins_by_id[id] = true # use NullMetric if called in the BasePipeline context otherwise use the @metric value metric = @metric || Instrument::NullMetric.new pipeline_scoped_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :plugins]) # Scope plugins of type 'input' to 'inputs' type_scoped_metric = pipeline_scoped_metric.namespace("#{plugin_type}s".to_sym) klass = Plugin.lookup(plugin_type, name) if plugin_type == "output" OutputDelegator.new(@logger, klass, type_scoped_metric, OutputDelegatorStrategyRegistry.instance, args) elsif plugin_type == "filter" FilterDelegator.new(@logger, klass, type_scoped_metric, args) else # input input_plugin = klass.new(args) input_plugin.metric = type_scoped_metric.namespace(id) input_plugin end end |
#reloadable? ⇒ Boolean
101 102 103 |
# File 'lib/logstash/pipeline.rb', line 101 def reloadable? non_reloadable_plugins.empty? end |