Class: LogStash::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/pipeline.rb

Instance Method Summary collapse

Constructor Details

#initialize(configstr) ⇒ Pipeline

Returns a new instance of Pipeline.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/logstash/pipeline.rb', line 12

def initialize(configstr)
  @logger = Cabin::Channel.get(LogStash)
  grammar = LogStashConfigParser.new
  @config = grammar.parse(configstr)
  if @config.nil?
    raise LogStash::ConfigurationError, grammar.failure_reason
  end

  # This will compile the config to ruby and evaluate the resulting code.
  # The code will initialize all the plugins and define the
  # filter and output methods.
  code = @config.compile
  # The config code is hard to represent as a log message...
  # So just print it.
  @logger.debug? && @logger.debug("Compiled pipeline code:\n#{code}")
  begin
    eval(code)
  rescue => e
    raise
  end

  @input_to_filter = SizedQueue.new(20)

  # If no filters, pipe inputs directly to outputs
  if !filters?
    @filter_to_output = @input_to_filter
  else
    @filter_to_output = SizedQueue.new(20)
  end
  @settings = {
    "filter-workers" => 1,
  }
end

Instance Method Details

#configure(setting, value) ⇒ Object



54
55
56
# File 'lib/logstash/pipeline.rb', line 54

def configure(setting, value)
  @settings[setting] = value
end

#filter(event, &block) ⇒ Object



254
255
256
# File 'lib/logstash/pipeline.rb', line 254

def filter(event, &block)
  @filter_func.call(event, &block)
end

#filter_flusherObject



262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/logstash/pipeline.rb', line 262

def filter_flusher
  events = []
  @filters.each do |filter|

    # Filter any events generated so far in this flush.
    events.each do |event|
      # TODO(sissel): watchdog on flush filtration?
      unless event.cancelled?
        filter.filter(event)
      end
    end

    # TODO(sissel): watchdog on flushes?
    if filter.respond_to?(:flush)
      flushed = filter.flush
      events += flushed if !flushed.nil? && flushed.any?
    end
  end

  events.each do |event|
    @logger.debug? and @logger.debug("Pushing flushed events", :event => event)
    @filter_to_output.push(event) unless event.cancelled?
  end
end

#filters?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/logstash/pipeline.rb', line 58

def filters?
  return @filters.any?
end

#filterworkerObject

def inputworker



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/logstash/pipeline.rb', line 180

def filterworker
  LogStash::Util::set_thread_name("|worker")
  begin
    while true
      event = @input_to_filter.pop
      if event == LogStash::ShutdownSignal
        @input_to_filter.push(event)
        break
      end


      # TODO(sissel): we can avoid the extra array creation here
      # if we don't guarantee ordering of origin vs created events.
      # - origin event is one that comes in naturally to the filter worker.
      # - created events are emitted by filters like split or metrics
      events = [event]
      filter(event) do |newevent|
        events << newevent
      end
      events.each do |event|
        next if event.cancelled?
        @filter_to_output.push(event)
      end
    end
  rescue => e
    @logger.error("Exception in filterworker", "exception" => e, "backtrace" => e.backtrace)
  end

  @filters.each(&:teardown)
end

#inputworker(plugin) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/logstash/pipeline.rb', line 153

def inputworker(plugin)
  LogStash::Util::set_thread_name("<#{plugin.class.config_name}")
  begin
    plugin.run(@input_to_filter)
  rescue LogStash::ShutdownSignal
    return
  rescue => e
    if @logger.debug?
      @logger.error(I18n.t("logstash.pipeline.worker-error-debug",
                           :plugin => plugin.inspect, :error => e.to_s,
                           :exception => e.class,
                           :stacktrace => e.backtrace.join("\n")))
    else
      @logger.error(I18n.t("logstash.pipeline.worker-error",
                           :plugin => plugin.inspect, :error => e))
    end
    puts e.backtrace if @logger.debug?
    plugin.teardown
    sleep 1
    retry
  end
rescue LogStash::ShutdownSignal
  # nothing
ensure
  plugin.teardown
end

#output(event) ⇒ Object



258
259
260
# File 'lib/logstash/pipeline.rb', line 258

def output(event)
  @output_func.call(event)
end

#outputworkerObject

def filterworker



211
212
213
214
215
216
217
218
219
220
221
# File 'lib/logstash/pipeline.rb', line 211

def outputworker
  LogStash::Util::set_thread_name(">output")
  @outputs.each(&:register)
  @outputs.each(&:worker_setup)
  while true
    event = @filter_to_output.pop
    break if event == LogStash::ShutdownSignal
    output(event)
  end # while true
  @outputs.each(&:teardown)
end

#plugin(plugin_type, name, *args) ⇒ Object

def shutdown



248
249
250
251
252
# File 'lib/logstash/pipeline.rb', line 248

def plugin(plugin_type, name, *args)
  args << {} if args.empty?
  klass = LogStash::Plugin.lookup(plugin_type, name)
  return klass.new(*args)
end

#ready?Boolean

def initialize

Returns:

  • (Boolean)


46
47
48
# File 'lib/logstash/pipeline.rb', line 46

def ready?
  return @ready
end

#runObject



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/logstash/pipeline.rb', line 62

def run
  @started = true
  @input_threads = []
  start_inputs
  start_filters if filters?
  start_outputs

  @ready = true

  @logger.info("Pipeline started")
  wait_inputs

  # In theory there's nothing to do to filters to tell them to shutdown?
  if filters?
    shutdown_filters
    wait_filters
  end
  shutdown_outputs
  wait_outputs

  @logger.info("Pipeline shutdown complete.")

  # exit code
  return 0
end

#shutdownObject

Shutdown this pipeline.

This method is intended to be called from another thread



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/logstash/pipeline.rb', line 226

def shutdown
  @input_threads.each do |thread|
    # Interrupt all inputs
    @logger.info("Sending shutdown signal to input thread",
                 :thread => thread)
    thread.raise(LogStash::ShutdownSignal)
    begin
      thread.wakeup # in case it's in blocked IO or sleeping
    rescue ThreadError
    end

    # Sometimes an input is stuck in a blocking I/O
    # so we need to tell it to teardown directly
    @inputs.each do |input|
      input.teardown
    end
  end

  # No need to send the ShutdownSignal to the filters/outputs nor to wait for
  # the inputs to finish, because in the #run method we wait for that anyway.
end

#shutdown_filtersObject



98
99
100
# File 'lib/logstash/pipeline.rb', line 98

def shutdown_filters
  @input_to_filter.push(LogStash::ShutdownSignal)
end

#shutdown_outputsObject



106
107
108
109
# File 'lib/logstash/pipeline.rb', line 106

def shutdown_outputs
  # nothing, filters will do this
  @filter_to_output.push(LogStash::ShutdownSignal)
end

#start_filtersObject



133
134
135
136
137
138
139
140
141
# File 'lib/logstash/pipeline.rb', line 133

def start_filters
  @filters.each(&:register)
  @filter_threads = @settings["filter-workers"].times.collect do
    Thread.new { filterworker }
  end

  # Set up the periodic flusher thread.
  @flusher_thread = Thread.new { Stud.interval(5) { filter_flusher } }
end

#start_input(plugin) ⇒ Object



149
150
151
# File 'lib/logstash/pipeline.rb', line 149

def start_input(plugin)
  @input_threads << Thread.new { inputworker(plugin) }
end

#start_inputsObject



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/logstash/pipeline.rb', line 116

def start_inputs
  moreinputs = []
  @inputs.each do |input|
    if input.threadable && input.threads > 1
      (input.threads-1).times do |i|
        moreinputs << input.clone
      end
    end
  end
  @inputs += moreinputs

  @inputs.each do |input|
    input.register
    start_input(input)
  end
end

#start_outputsObject



143
144
145
146
147
# File 'lib/logstash/pipeline.rb', line 143

def start_outputs
  @output_threads = [
    Thread.new { outputworker }
  ]
end

#started?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/logstash/pipeline.rb', line 50

def started?
  return @started
end

#wait_filtersObject



102
103
104
# File 'lib/logstash/pipeline.rb', line 102

def wait_filters
  @filter_threads.each(&:join) if @filter_threads
end

#wait_inputsObject

def run



88
89
90
91
92
93
94
95
96
# File 'lib/logstash/pipeline.rb', line 88

def wait_inputs
  @input_threads.each(&:join)
rescue Interrupt
  # rbx does weird things during do SIGINT that I haven't debugged
  # so we catch Interrupt here and signal a shutdown. For some reason the
  # signal handler isn't invoked it seems? I dunno, haven't looked much into
  # it.
  shutdown
end

#wait_outputsObject



111
112
113
114
# File 'lib/logstash/pipeline.rb', line 111

def wait_outputs
  # Wait for the outputs to stop
  @output_threads.each(&:join)
end