Class: LogStash::Pipeline
- Inherits:
-
Object
- Object
- LogStash::Pipeline
- Defined in:
- lib/logstash/pipeline.rb
Instance Method Summary collapse
- #configure(setting, value) ⇒ Object
-
#filter(event, &block) ⇒ Object
for backward compatibility in devutils for the rspec helpers, this method is not used in the pipeline anymore.
- #filters? ⇒ Boolean
-
#filterworker ⇒ Object
def inputworker.
-
#flush_filters(options = {}, &block) ⇒ Object
perform filters flush and yeild flushed event to the passed block.
-
#flush_filters_to_output!(options = {}) ⇒ Object
perform filters flush into the output queue.
-
#initialize(configstr) ⇒ Pipeline
constructor
A new instance of Pipeline.
- #inputworker(plugin) ⇒ Object
-
#outputworker ⇒ Object
def filterworker.
-
#plugin(plugin_type, name, *args) ⇒ Object
def shutdown.
-
#ready? ⇒ Boolean
def initialize.
- #run ⇒ Object
-
#shutdown ⇒ Object
Shutdown this pipeline.
- #shutdown_filters ⇒ Object
- #shutdown_outputs ⇒ Object
- #start_filters ⇒ Object
- #start_input(plugin) ⇒ Object
- #start_inputs ⇒ Object
- #start_outputs ⇒ Object
- #started? ⇒ Boolean
- #wait_filters ⇒ Object
-
#wait_inputs ⇒ Object
def run.
- #wait_outputs ⇒ Object
Constructor Details
#initialize(configstr) ⇒ Pipeline
Returns a new instance of Pipeline.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/logstash/pipeline.rb', line 14 def initialize(configstr) @logger = Cabin::Channel.get(LogStash) grammar = LogStashConfigParser.new @config = grammar.parse(configstr) if @config.nil? raise LogStash::ConfigurationError, grammar.failure_reason end # This will compile the config to ruby and evaluate the resulting code. # The code will initialize all the plugins and define the # filter and output methods. code = @config.compile # The config code is hard to represent as a log message... # So just print it. @logger.debug? && @logger.debug("Compiled pipeline code:\n#{code}") begin eval(code) rescue => e raise end @input_to_filter = SizedQueue.new(20) # If no filters, pipe inputs directly to outputs if !filters? @filter_to_output = @input_to_filter else @filter_to_output = SizedQueue.new(20) end @settings = { "filter-workers" => 1, } @run_mutex = Mutex.new @ready = false @started = false @input_threads = [] end |
Instance Method Details
#configure(setting, value) ⇒ Object
61 62 63 64 65 66 67 68 69 70 |
# File 'lib/logstash/pipeline.rb', line 61 def configure(setting, value) if setting == "filter-workers" # Abort if we have any filters that aren't threadsafe if value > 1 && @filters.any? { |f| !f.threadsafe? } plugins = @filters.select { |f| !f.threadsafe? }.collect { |f| f.class.config_name } raise LogStash::ConfigurationError, "Cannot use more than 1 filter worker because the following plugins don't work with more than one worker: #{plugins.join(", ")}" end end @settings[setting] = value end |
#filter(event, &block) ⇒ Object
for backward compatibility in devutils for the rspec helpers, this method is not used in the pipeline anymore.
296 297 298 299 |
# File 'lib/logstash/pipeline.rb', line 296 def filter(event, &block) # filter_func returns all filtered events, including cancelled ones filter_func(event).each { |e| block.call(e) } end |
#filters? ⇒ Boolean
72 73 74 |
# File 'lib/logstash/pipeline.rb', line 72 def filters? return @filters.any? end |
#filterworker ⇒ Object
def inputworker
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/logstash/pipeline.rb', line 209 def filterworker LogStash::Util::set_thread_name("|worker") begin while true event = @input_to_filter.pop case event when LogStash::Event # filter_func returns all filtered events, including cancelled ones filter_func(event).each { |e| @filter_to_output.push(e) unless e.cancelled? } when LogStash::FlushEvent # handle filter flushing here so that non threadsafe filters (thus only running one filterworker) # don't have to deal with thread safety implementing the flush method flush_filters_to_output! when LogStash::ShutdownEvent # pass it down to any other filterworker and stop this worker @input_to_filter.push(event) break end end rescue => e @logger.error("Exception in filterworker", "exception" => e, "backtrace" => e.backtrace) end @filters.each(&:teardown) end |
#flush_filters(options = {}, &block) ⇒ Object
perform filters flush and yeild flushed event to the passed block
304 305 306 307 308 309 310 |
# File 'lib/logstash/pipeline.rb', line 304 def flush_filters( = {}, &block) flushers = [:final] ? @shutdown_flushers : @periodic_flushers flushers.each do |flusher| flusher.call(, &block) end end |
#flush_filters_to_output!(options = {}) ⇒ Object
perform filters flush into the output queue
315 316 317 318 319 320 321 322 |
# File 'lib/logstash/pipeline.rb', line 315 def flush_filters_to_output!( = {}) flush_filters() do |event| unless event.cancelled? @logger.debug? and @logger.debug("Pushing flushed events", :event => event) @filter_to_output.push(event) end end end |
#inputworker(plugin) ⇒ Object
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/logstash/pipeline.rb', line 173 def inputworker(plugin) LogStash::Util::set_thread_name("<#{plugin.class.config_name}") begin plugin.run(@input_to_filter) rescue LogStash::ShutdownSignal # ignore and quit rescue => e if @logger.debug? @logger.error(I18n.t("logstash.pipeline.worker-error-debug", :plugin => plugin.inspect, :error => e.to_s, :exception => e.class, :stacktrace => e.backtrace.join("\n"))) else @logger.error(I18n.t("logstash.pipeline.worker-error", :plugin => plugin.inspect, :error => e)) end puts e.backtrace if @logger.debug? # input teardown must be synchronized since is can be called concurrently by # the input worker thread and from the pipeline thread shutdown method. # this means that input teardown methods must support multiple calls. @run_mutex.synchronize{plugin.teardown} sleep 1 retry end ensure begin # input teardown must be synchronized since is can be called concurrently by # the input worker thread and from the pipeline thread shutdown method. # this means that input teardown methods must support multiple calls. @run_mutex.synchronize{plugin.teardown} rescue LogStash::ShutdownSignal # teardown could receive the ShutdownSignal, retry it retry end end |
#outputworker ⇒ Object
def filterworker
236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/logstash/pipeline.rb', line 236 def outputworker LogStash::Util::set_thread_name(">output") @outputs.each(&:worker_setup) while true event = @filter_to_output.pop break if event == LogStash::SHUTDOWN output_func(event) end # while true @outputs.each do |output| output.worker_plugins.each(&:teardown) end end |
#plugin(plugin_type, name, *args) ⇒ Object
def shutdown
288 289 290 291 292 |
# File 'lib/logstash/pipeline.rb', line 288 def plugin(plugin_type, name, *args) args << {} if args.empty? klass = LogStash::Plugin.lookup(plugin_type, name) return klass.new(*args) end |
#ready? ⇒ Boolean
def initialize
53 54 55 |
# File 'lib/logstash/pipeline.rb', line 53 def ready? @run_mutex.synchronize{@ready} end |
#run ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/logstash/pipeline.rb', line 76 def run @run_mutex.synchronize{@started = true} # synchronize @input_threads between run and shutdown @run_mutex.synchronize{start_inputs} start_filters if filters? start_outputs @run_mutex.synchronize{@ready = true} @logger.info("Pipeline started") @logger.terminal("Logstash startup completed") wait_inputs if filters? shutdown_filters wait_filters flush_filters_to_output!(:final => true) end shutdown_outputs wait_outputs @logger.info("Pipeline shutdown complete.") @logger.terminal("Logstash shutdown completed") # exit code return 0 end |
#shutdown ⇒ Object
Shutdown this pipeline.
This method is intended to be called from another thread
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/logstash/pipeline.rb', line 254 def shutdown @input_threads.each do |thread| # Interrupt all inputs @logger.info("Sending shutdown signal to input thread", :thread => thread) # synchronize both ShutdownSignal and teardown below. by synchronizing both # we will avoid potentially sending a shutdown signal when the inputworker is # executing the teardown method. @run_mutex.synchronize do thread.raise(LogStash::ShutdownSignal) begin thread.wakeup # in case it's in blocked IO or sleeping rescue ThreadError end end end # sometimes an input is stuck in a blocking I/O so we need to tell it to teardown directly @inputs.each do |input| begin # input teardown must be synchronized since is can be called concurrently by # the input worker thread and from the pipeline thread shutdown method. # this means that input teardown methods must support multiple calls. @run_mutex.synchronize{input.teardown} rescue LogStash::ShutdownSignal # teardown could receive the ShutdownSignal, retry it retry end end # No need to send the ShutdownEvent to the filters/outputs nor to wait for # the inputs to finish, because in the #run method we wait for that anyway. end |
#shutdown_filters ⇒ Object
117 118 119 120 |
# File 'lib/logstash/pipeline.rb', line 117 def shutdown_filters @flusher_thread.kill @input_to_filter.push(LogStash::SHUTDOWN) end |
#shutdown_outputs ⇒ Object
126 127 128 129 |
# File 'lib/logstash/pipeline.rb', line 126 def shutdown_outputs # nothing, filters will do this @filter_to_output.push(LogStash::SHUTDOWN) end |
#start_filters ⇒ Object
153 154 155 156 157 158 159 160 |
# File 'lib/logstash/pipeline.rb', line 153 def start_filters @filters.each(&:register) @filter_threads = @settings["filter-workers"].times.collect do Thread.new { filterworker } end @flusher_thread = Thread.new { Stud.interval(5) { @input_to_filter.push(LogStash::FLUSH) } } end |
#start_input(plugin) ⇒ Object
169 170 171 |
# File 'lib/logstash/pipeline.rb', line 169 def start_input(plugin) @input_threads << Thread.new { inputworker(plugin) } end |
#start_inputs ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/logstash/pipeline.rb', line 136 def start_inputs moreinputs = [] @inputs.each do |input| if input.threadable && input.threads > 1 (input.threads - 1).times do |i| moreinputs << input.clone end end end @inputs += moreinputs @inputs.each do |input| input.register start_input(input) end end |
#start_outputs ⇒ Object
162 163 164 165 166 167 |
# File 'lib/logstash/pipeline.rb', line 162 def start_outputs @outputs.each(&:register) @output_threads = [ Thread.new { outputworker } ] end |
#started? ⇒ Boolean
57 58 59 |
# File 'lib/logstash/pipeline.rb', line 57 def started? @run_mutex.synchronize{@started} end |
#wait_filters ⇒ Object
122 123 124 |
# File 'lib/logstash/pipeline.rb', line 122 def wait_filters @filter_threads.each(&:join) if @filter_threads end |
#wait_inputs ⇒ Object
def run
107 108 109 110 111 112 113 114 115 |
# File 'lib/logstash/pipeline.rb', line 107 def wait_inputs @input_threads.each(&:join) rescue Interrupt # rbx does weird things during do SIGINT that I haven't debugged # so we catch Interrupt here and signal a shutdown. For some reason the # signal handler isn't invoked it seems? I dunno, haven't looked much into # it. shutdown end |
#wait_outputs ⇒ Object
131 132 133 134 |
# File 'lib/logstash/pipeline.rb', line 131 def wait_outputs # Wait for the outputs to stop @output_threads.each(&:join) end |