Class: LogStash::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/pipeline.rb

Instance Method Summary collapse

Constructor Details

#initialize(configstr) ⇒ Pipeline

Returns a new instance of Pipeline.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/logstash/pipeline.rb', line 14

def initialize(configstr)
  @logger = Cabin::Channel.get(LogStash)
  grammar = LogStashConfigParser.new
  @config = grammar.parse(configstr)
  if @config.nil?
    raise LogStash::ConfigurationError, grammar.failure_reason
  end

  # This will compile the config to ruby and evaluate the resulting code.
  # The code will initialize all the plugins and define the
  # filter and output methods.
  code = @config.compile
  # The config code is hard to represent as a log message...
  # So just print it.
  @logger.debug? && @logger.debug("Compiled pipeline code:\n#{code}")
  begin
    eval(code)
  rescue => e
    raise
  end

  @input_to_filter = SizedQueue.new(20)

  # If no filters, pipe inputs directly to outputs
  if !filters?
    @filter_to_output = @input_to_filter
  else
    @filter_to_output = SizedQueue.new(20)
  end
  @settings = {
    "filter-workers" => 1,
  }

  @run_mutex = Mutex.new
  @ready = false
  @started = false
  @input_threads = []
end

Instance Method Details

#configure(setting, value) ⇒ Object



61
62
63
64
65
66
67
68
69
70
# File 'lib/logstash/pipeline.rb', line 61

def configure(setting, value)
  if setting == "filter-workers"
    # Abort if we have any filters that aren't threadsafe
    if value > 1 && @filters.any? { |f| !f.threadsafe? }
      plugins = @filters.select { |f| !f.threadsafe? }.collect { |f| f.class.config_name }
      raise LogStash::ConfigurationError, "Cannot use more than 1 filter worker because the following plugins don't work with more than one worker: #{plugins.join(", ")}"
    end
  end
  @settings[setting] = value
end

#filter(event, &block) ⇒ Object

for backward compatibility in devutils for the rspec helpers, this method is not used in the pipeline anymore.



296
297
298
299
# File 'lib/logstash/pipeline.rb', line 296

def filter(event, &block)
  # filter_func returns all filtered events, including cancelled ones
  filter_func(event).each { |e| block.call(e) }
end

#filters?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/logstash/pipeline.rb', line 72

def filters?
  return @filters.any?
end

#filterworkerObject

def inputworker



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/logstash/pipeline.rb', line 209

def filterworker
  LogStash::Util::set_thread_name("|worker")
  begin
    while true
      event = @input_to_filter.pop

      case event
      when LogStash::Event
        # filter_func returns all filtered events, including cancelled ones
        filter_func(event).each { |e| @filter_to_output.push(e) unless e.cancelled? }
      when LogStash::FlushEvent
        # handle filter flushing here so that non threadsafe filters (thus only running one filterworker)
        # don't have to deal with thread safety implementing the flush method
        flush_filters_to_output!
      when LogStash::ShutdownEvent
        # pass it down to any other filterworker and stop this worker
        @input_to_filter.push(event)
        break
      end
    end
  rescue => e
    @logger.error("Exception in filterworker", "exception" => e, "backtrace" => e.backtrace)
  end

  @filters.each(&:teardown)
end

#flush_filters(options = {}, &block) ⇒ Object

perform filters flush and yeild flushed event to the passed block

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :final (Boolean)

    > true to signal a final shutdown flush



304
305
306
307
308
309
310
# File 'lib/logstash/pipeline.rb', line 304

def flush_filters(options = {}, &block)
  flushers = options[:final] ? @shutdown_flushers : @periodic_flushers

  flushers.each do |flusher|
    flusher.call(options, &block)
  end
end

#flush_filters_to_output!(options = {}) ⇒ Object

perform filters flush into the output queue

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :final (Boolean)

    > true to signal a final shutdown flush



315
316
317
318
319
320
321
322
# File 'lib/logstash/pipeline.rb', line 315

def flush_filters_to_output!(options = {})
  flush_filters(options) do |event|
    unless event.cancelled?
      @logger.debug? and @logger.debug("Pushing flushed events", :event => event)
      @filter_to_output.push(event)
    end
  end
end

#inputworker(plugin) ⇒ Object



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/logstash/pipeline.rb', line 173

def inputworker(plugin)
  LogStash::Util::set_thread_name("<#{plugin.class.config_name}")
  begin
    plugin.run(@input_to_filter)
  rescue LogStash::ShutdownSignal
    # ignore and quit
  rescue => e
    if @logger.debug?
      @logger.error(I18n.t("logstash.pipeline.worker-error-debug",
                           :plugin => plugin.inspect, :error => e.to_s,
                           :exception => e.class,
                           :stacktrace => e.backtrace.join("\n")))
    else
      @logger.error(I18n.t("logstash.pipeline.worker-error",
                           :plugin => plugin.inspect, :error => e))
    end
    puts e.backtrace if @logger.debug?
    # input teardown must be synchronized since is can be called concurrently by
    # the input worker thread and from the pipeline thread shutdown method.
    # this means that input teardown methods must support multiple calls.
    @run_mutex.synchronize{plugin.teardown}
    sleep 1
    retry
  end
ensure
  begin
    # input teardown must be synchronized since is can be called concurrently by
    # the input worker thread and from the pipeline thread shutdown method.
    # this means that input teardown methods must support multiple calls.
    @run_mutex.synchronize{plugin.teardown}
  rescue LogStash::ShutdownSignal
    # teardown could receive the ShutdownSignal, retry it
    retry
  end
end

#outputworkerObject

def filterworker



236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/logstash/pipeline.rb', line 236

def outputworker
  LogStash::Util::set_thread_name(">output")
  @outputs.each(&:worker_setup)

  while true
    event = @filter_to_output.pop
    break if event == LogStash::SHUTDOWN
    output_func(event)
  end # while true

  @outputs.each do |output|
    output.worker_plugins.each(&:teardown)
  end
end

#plugin(plugin_type, name, *args) ⇒ Object

def shutdown



288
289
290
291
292
# File 'lib/logstash/pipeline.rb', line 288

def plugin(plugin_type, name, *args)
  args << {} if args.empty?
  klass = LogStash::Plugin.lookup(plugin_type, name)
  return klass.new(*args)
end

#ready?Boolean

def initialize

Returns:

  • (Boolean)


53
54
55
# File 'lib/logstash/pipeline.rb', line 53

def ready?
  @run_mutex.synchronize{@ready}
end

#runObject



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/logstash/pipeline.rb', line 76

def run
  @run_mutex.synchronize{@started = true}

  # synchronize @input_threads between run and shutdown
  @run_mutex.synchronize{start_inputs}
  start_filters if filters?
  start_outputs

  @run_mutex.synchronize{@ready = true}

  @logger.info("Pipeline started")
  @logger.terminal("Logstash startup completed")

  wait_inputs

  if filters?
    shutdown_filters
    wait_filters
    flush_filters_to_output!(:final => true)
  end

  shutdown_outputs
  wait_outputs

  @logger.info("Pipeline shutdown complete.")
  @logger.terminal("Logstash shutdown completed")

  # exit code
  return 0
end

#shutdownObject

Shutdown this pipeline.

This method is intended to be called from another thread



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/logstash/pipeline.rb', line 254

def shutdown
  @input_threads.each do |thread|
    # Interrupt all inputs
    @logger.info("Sending shutdown signal to input thread", :thread => thread)

    # synchronize both ShutdownSignal and teardown below. by synchronizing both
    # we will avoid potentially sending a shutdown signal when the inputworker is
    # executing the teardown method.
    @run_mutex.synchronize do
      thread.raise(LogStash::ShutdownSignal)
      begin
        thread.wakeup # in case it's in blocked IO or sleeping
      rescue ThreadError
      end
    end
  end

  # sometimes an input is stuck in a blocking I/O so we need to tell it to teardown directly
  @inputs.each do |input|
    begin
      # input teardown must be synchronized since is can be called concurrently by
      # the input worker thread and from the pipeline thread shutdown method.
      # this means that input teardown methods must support multiple calls.
      @run_mutex.synchronize{input.teardown}
    rescue LogStash::ShutdownSignal
      # teardown could receive the ShutdownSignal, retry it
      retry
    end
  end

  # No need to send the ShutdownEvent to the filters/outputs nor to wait for
  # the inputs to finish, because in the #run method we wait for that anyway.
end

#shutdown_filtersObject



117
118
119
120
# File 'lib/logstash/pipeline.rb', line 117

def shutdown_filters
  @flusher_thread.kill
  @input_to_filter.push(LogStash::SHUTDOWN)
end

#shutdown_outputsObject



126
127
128
129
# File 'lib/logstash/pipeline.rb', line 126

def shutdown_outputs
  # nothing, filters will do this
  @filter_to_output.push(LogStash::SHUTDOWN)
end

#start_filtersObject



153
154
155
156
157
158
159
160
# File 'lib/logstash/pipeline.rb', line 153

def start_filters
  @filters.each(&:register)
  @filter_threads = @settings["filter-workers"].times.collect do
    Thread.new { filterworker }
  end

  @flusher_thread = Thread.new { Stud.interval(5) { @input_to_filter.push(LogStash::FLUSH) } }
end

#start_input(plugin) ⇒ Object



169
170
171
# File 'lib/logstash/pipeline.rb', line 169

def start_input(plugin)
  @input_threads << Thread.new { inputworker(plugin) }
end

#start_inputsObject



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/logstash/pipeline.rb', line 136

def start_inputs
  moreinputs = []
  @inputs.each do |input|
    if input.threadable && input.threads > 1
      (input.threads - 1).times do |i|
        moreinputs << input.clone
      end
    end
  end
  @inputs += moreinputs

  @inputs.each do |input|
    input.register
    start_input(input)
  end
end

#start_outputsObject



162
163
164
165
166
167
# File 'lib/logstash/pipeline.rb', line 162

def start_outputs
  @outputs.each(&:register)
  @output_threads = [
    Thread.new { outputworker }
  ]
end

#started?Boolean

Returns:

  • (Boolean)


57
58
59
# File 'lib/logstash/pipeline.rb', line 57

def started?
  @run_mutex.synchronize{@started}
end

#wait_filtersObject



122
123
124
# File 'lib/logstash/pipeline.rb', line 122

def wait_filters
  @filter_threads.each(&:join) if @filter_threads
end

#wait_inputsObject

def run



107
108
109
110
111
112
113
114
115
# File 'lib/logstash/pipeline.rb', line 107

def wait_inputs
  @input_threads.each(&:join)
rescue Interrupt
  # rbx does weird things during do SIGINT that I haven't debugged
  # so we catch Interrupt here and signal a shutdown. For some reason the
  # signal handler isn't invoked it seems? I dunno, haven't looked much into
  # it.
  shutdown
end

#wait_outputsObject



131
132
133
134
# File 'lib/logstash/pipeline.rb', line 131

def wait_outputs
  # Wait for the outputs to stop
  @output_threads.each(&:join)
end