Class: LogStash::BasePipeline

Inherits:
Object
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/pipeline.rb

Direct Known Subclasses

Pipeline

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util::Loggable

included, #logger, #slow_logger

Constructor Details

#initialize(config_str, settings = SETTINGS) ⇒ BasePipeline

Returns a new instance of BasePipeline.

Raises:



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/logstash/pipeline.rb', line 29

def initialize(config_str, settings = SETTINGS)
  @logger = self.logger
  @config_str = config_str
  @config_hash = Digest::SHA1.hexdigest(@config_str)
  # Every time #plugin is invoked this is incremented to give each plugin
  # a unique id when auto-generating plugin ids
  @plugin_counter ||= 0

  @pipeline_id = settings.get_value("pipeline.id") || self.object_id

  # A list of plugins indexed by id
  @plugins_by_id = {}
  @inputs = nil
  @filters = nil
  @outputs = nil

  grammar = LogStashConfigParser.new
  parsed_config = grammar.parse(config_str)
  raise(ConfigurationError, grammar.failure_reason) if parsed_config.nil?

  config_code = parsed_config.compile

  # config_code = BasePipeline.compileConfig(config_str)

  if settings.get_value("config.debug") && @logger.debug?
    @logger.debug("Compiled pipeline code", :code => config_code)
  end

  # Evaluate the config compiled code that will initialize all the plugins and define the
  # filter and output methods.
  begin
    eval(config_code)
  rescue => e
    raise e
  end
end

Instance Attribute Details

#config_hashObject (readonly)

Returns the value of attribute config_hash.



27
28
29
# File 'lib/logstash/pipeline.rb', line 27

def config_hash
  @config_hash
end

#config_strObject (readonly)

Returns the value of attribute config_str.



27
28
29
# File 'lib/logstash/pipeline.rb', line 27

def config_str
  @config_str
end

#filtersObject (readonly)

Returns the value of attribute filters.



27
28
29
# File 'lib/logstash/pipeline.rb', line 27

def filters
  @filters
end

#inputsObject (readonly)

Returns the value of attribute inputs.



27
28
29
# File 'lib/logstash/pipeline.rb', line 27

def inputs
  @inputs
end

#outputsObject (readonly)

Returns the value of attribute outputs.



27
28
29
# File 'lib/logstash/pipeline.rb', line 27

def outputs
  @outputs
end

#pipeline_idObject (readonly)

Returns the value of attribute pipeline_id.



27
28
29
# File 'lib/logstash/pipeline.rb', line 27

def pipeline_id
  @pipeline_id
end

Instance Method Details

#non_reloadable_pluginsObject



105
106
107
# File 'lib/logstash/pipeline.rb', line 105

def non_reloadable_plugins
  (inputs + filters + outputs).select { |plugin| !plugin.reloadable? }
end

#plugin(plugin_type, name, *args) ⇒ Object

Raises:



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/logstash/pipeline.rb', line 66

def plugin(plugin_type, name, *args)
  @plugin_counter += 1

  # Collapse the array of arguments into a single merged hash
  args = args.reduce({}, &:merge)

  id = if args["id"].nil? || args["id"].empty?
    args["id"] = "#{@config_hash}-#{@plugin_counter}"
  else
    args["id"]
  end

  raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id]
  @plugins_by_id[id] = true

  # use NullMetric if called in the BasePipeline context otherwise use the @metric value
  metric = @metric || Instrument::NullMetric.new

  pipeline_scoped_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :plugins])
  # Scope plugins of type 'input' to 'inputs'
  type_scoped_metric = pipeline_scoped_metric.namespace("#{plugin_type}s".to_sym)

  klass = Plugin.lookup(plugin_type, name)

  if plugin_type == "output"
    OutputDelegator.new(@logger, klass, type_scoped_metric,  OutputDelegatorStrategyRegistry.instance, args)
  elsif plugin_type == "filter"
    FilterDelegator.new(@logger, klass, type_scoped_metric, args)
  else # input
    input_plugin = klass.new(args)
    input_plugin.metric = type_scoped_metric.namespace(id)
    input_plugin
  end
end

#reloadable?Boolean

Returns:

  • (Boolean)


101
102
103
# File 'lib/logstash/pipeline.rb', line 101

def reloadable?
  non_reloadable_plugins.empty?
end