Class: LogStash::Pipeline
- Inherits:
-
Object
- Object
- LogStash::Pipeline
- Defined in:
- lib/logstash/pipeline.rb
Instance Method Summary collapse
- #configure(setting, value) ⇒ Object
-
#filter(event, &block) ⇒ Object
for backward compatibility in devutils for the rspec helpers, this method is not used in the pipeline anymore.
- #filters? ⇒ Boolean
-
#filterworker ⇒ Object
def inputworker.
-
#flush_filters(options = {}, &block) ⇒ Object
perform filters flush and yeild flushed event to the passed block.
-
#flush_filters_to_output!(options = {}) ⇒ Object
perform filters flush into the output queue.
-
#initialize(configstr) ⇒ Pipeline
constructor
A new instance of Pipeline.
- #inputworker(plugin) ⇒ Object
-
#outputworker ⇒ Object
def filterworker.
-
#plugin(plugin_type, name, *args) ⇒ Object
def shutdown.
-
#ready? ⇒ Boolean
def initialize.
- #run ⇒ Object
-
#shutdown ⇒ Object
Shutdown this pipeline.
- #shutdown_filters ⇒ Object
- #shutdown_outputs ⇒ Object
- #start_filters ⇒ Object
- #start_input(plugin) ⇒ Object
- #start_inputs ⇒ Object
- #start_outputs ⇒ Object
- #started? ⇒ Boolean
- #wait_filters ⇒ Object
-
#wait_inputs ⇒ Object
def run.
- #wait_outputs ⇒ Object
Constructor Details
#initialize(configstr) ⇒ Pipeline
Returns a new instance of Pipeline.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/logstash/pipeline.rb', line 15 def initialize(configstr) @logger = Cabin::Channel.get(LogStash) grammar = LogStashConfigParser.new @config = grammar.parse(configstr) if @config.nil? raise LogStash::ConfigurationError, grammar.failure_reason end # This will compile the config to ruby and evaluate the resulting code. # The code will initialize all the plugins and define the # filter and output methods. code = @config.compile # The config code is hard to represent as a log message... # So just print it. @logger.debug? && @logger.debug("Compiled pipeline code:\n#{code}") begin eval(code) rescue => e raise end @input_to_filter = SizedQueue.new(20) # If no filters, pipe inputs directly to outputs if !filters? @filter_to_output = @input_to_filter else @filter_to_output = SizedQueue.new(20) end @settings = { "filter-workers" => 1, } @run_mutex = Mutex.new @ready = false @started = false @input_threads = [] end |
Instance Method Details
#configure(setting, value) ⇒ Object
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/logstash/pipeline.rb', line 62 def configure(setting, value) if setting == "filter-workers" # Abort if we have any filters that aren't threadsafe if value > 1 && @filters.any? { |f| !f.threadsafe? } plugins = @filters.select { |f| !f.threadsafe? }.collect { |f| f.class.config_name } raise LogStash::ConfigurationError, "Cannot use more than 1 filter worker because the following plugins don't work with more than one worker: #{plugins.join(", ")}" end end @settings[setting] = value end |
#filter(event, &block) ⇒ Object
for backward compatibility in devutils for the rspec helpers, this method is not used in the pipeline anymore.
299 300 301 302 |
# File 'lib/logstash/pipeline.rb', line 299 def filter(event, &block) # filter_func returns all filtered events, including cancelled ones filter_func(event).each { |e| block.call(e) } end |
#filters? ⇒ Boolean
73 74 75 |
# File 'lib/logstash/pipeline.rb', line 73 def filters? return @filters.any? end |
#filterworker ⇒ Object
def inputworker
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/logstash/pipeline.rb', line 210 def filterworker LogStash::Util::set_thread_name("|worker") begin while true event = @input_to_filter.pop case event when LogStash::Event # filter_func returns all filtered events, including cancelled ones filter_func(event).each { |e| @filter_to_output.push(e) unless e.cancelled? } when LogStash::FlushEvent # handle filter flushing here so that non threadsafe filters (thus only running one filterworker) # don't have to deal with thread safety implementing the flush method flush_filters_to_output! when LogStash::ShutdownEvent # pass it down to any other filterworker and stop this worker @input_to_filter.push(event) break end end rescue => e @logger.error("Exception in filterworker", "exception" => e, "backtrace" => e.backtrace) end @filters.each(&:teardown) end |
#flush_filters(options = {}, &block) ⇒ Object
perform filters flush and yeild flushed event to the passed block
307 308 309 310 311 312 313 |
# File 'lib/logstash/pipeline.rb', line 307 def flush_filters( = {}, &block) flushers = [:final] ? @shutdown_flushers : @periodic_flushers flushers.each do |flusher| flusher.call(, &block) end end |
#flush_filters_to_output!(options = {}) ⇒ Object
perform filters flush into the output queue
318 319 320 321 322 323 324 325 |
# File 'lib/logstash/pipeline.rb', line 318 def flush_filters_to_output!( = {}) flush_filters() do |event| unless event.cancelled? @logger.debug? and @logger.debug("Pushing flushed events", :event => event) @filter_to_output.push(event) end end end |
#inputworker(plugin) ⇒ Object
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/logstash/pipeline.rb', line 174 def inputworker(plugin) LogStash::Util::set_thread_name("<#{plugin.class.config_name}") begin plugin.run(@input_to_filter) rescue LogStash::ShutdownSignal # ignore and quit rescue => e if @logger.debug? @logger.error(I18n.t("logstash.pipeline.worker-error-debug", :plugin => plugin.inspect, :error => e.to_s, :exception => e.class, :stacktrace => e.backtrace.join("\n"))) else @logger.error(I18n.t("logstash.pipeline.worker-error", :plugin => plugin.inspect, :error => e)) end puts e.backtrace if @logger.debug? # input teardown must be synchronized since is can be called concurrently by # the input worker thread and from the pipeline thread shutdown method. # this means that input teardown methods must support multiple calls. @run_mutex.synchronize{plugin.teardown} sleep 1 retry end ensure begin # input teardown must be synchronized since is can be called concurrently by # the input worker thread and from the pipeline thread shutdown method. # this means that input teardown methods must support multiple calls. @run_mutex.synchronize{plugin.teardown} rescue LogStash::ShutdownSignal # teardown could receive the ShutdownSignal, retry it retry end end |
#outputworker ⇒ Object
def filterworker
237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/logstash/pipeline.rb', line 237 def outputworker LogStash::Util::set_thread_name(">output") @outputs.each(&:worker_setup) while true event = @filter_to_output.pop break if event == LogStash::SHUTDOWN output_func(event) end # while true @outputs.each do |output| output.worker_plugins.each(&:teardown) end end |
#plugin(plugin_type, name, *args) ⇒ Object
def shutdown
291 292 293 294 295 |
# File 'lib/logstash/pipeline.rb', line 291 def plugin(plugin_type, name, *args) args << {} if args.empty? klass = LogStash::Plugin.lookup(plugin_type, name) return klass.new(*args) end |
#ready? ⇒ Boolean
def initialize
54 55 56 |
# File 'lib/logstash/pipeline.rb', line 54 def ready? @run_mutex.synchronize{@ready} end |
#run ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/logstash/pipeline.rb', line 77 def run @run_mutex.synchronize{@started = true} # synchronize @input_threads between run and shutdown @run_mutex.synchronize{start_inputs} start_filters if filters? start_outputs @run_mutex.synchronize{@ready = true} @logger.info("Pipeline started") @logger.terminal("Logstash startup completed") wait_inputs if filters? shutdown_filters wait_filters flush_filters_to_output!(:final => true) end shutdown_outputs wait_outputs @logger.info("Pipeline shutdown complete.") @logger.terminal("Logstash shutdown completed") # exit code return 0 end |
#shutdown ⇒ Object
Shutdown this pipeline.
This method is intended to be called from another thread
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'lib/logstash/pipeline.rb', line 255 def shutdown InflightEventsReporter.logger = @logger InflightEventsReporter.start(@input_to_filter, @filter_to_output, @outputs) @input_threads.each do |thread| # Interrupt all inputs @logger.info("Sending shutdown signal to input thread", :thread => thread) # synchronize both ShutdownSignal and teardown below. by synchronizing both # we will avoid potentially sending a shutdown signal when the inputworker is # executing the teardown method. @run_mutex.synchronize do thread.raise(LogStash::ShutdownSignal) begin thread.wakeup # in case it's in blocked IO or sleeping rescue ThreadError end end end # sometimes an input is stuck in a blocking I/O so we need to tell it to teardown directly @inputs.each do |input| begin # input teardown must be synchronized since is can be called concurrently by # the input worker thread and from the pipeline thread shutdown method. # this means that input teardown methods must support multiple calls. @run_mutex.synchronize{input.teardown} rescue LogStash::ShutdownSignal # teardown could receive the ShutdownSignal, retry it retry end end # No need to send the ShutdownEvent to the filters/outputs nor to wait for # the inputs to finish, because in the #run method we wait for that anyway. end |
#shutdown_filters ⇒ Object
118 119 120 121 |
# File 'lib/logstash/pipeline.rb', line 118 def shutdown_filters @flusher_thread.kill @input_to_filter.push(LogStash::SHUTDOWN) end |
#shutdown_outputs ⇒ Object
127 128 129 130 |
# File 'lib/logstash/pipeline.rb', line 127 def shutdown_outputs # nothing, filters will do this @filter_to_output.push(LogStash::SHUTDOWN) end |
#start_filters ⇒ Object
154 155 156 157 158 159 160 161 |
# File 'lib/logstash/pipeline.rb', line 154 def start_filters @filters.each(&:register) @filter_threads = @settings["filter-workers"].times.collect do Thread.new { filterworker } end @flusher_thread = Thread.new { Stud.interval(5) { @input_to_filter.push(LogStash::FLUSH) } } end |
#start_input(plugin) ⇒ Object
170 171 172 |
# File 'lib/logstash/pipeline.rb', line 170 def start_input(plugin) @input_threads << Thread.new { inputworker(plugin) } end |
#start_inputs ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/logstash/pipeline.rb', line 137 def start_inputs moreinputs = [] @inputs.each do |input| if input.threadable && input.threads > 1 (input.threads - 1).times do |i| moreinputs << input.clone end end end @inputs += moreinputs @inputs.each do |input| input.register start_input(input) end end |
#start_outputs ⇒ Object
163 164 165 166 167 168 |
# File 'lib/logstash/pipeline.rb', line 163 def start_outputs @outputs.each(&:register) @output_threads = [ Thread.new { outputworker } ] end |
#started? ⇒ Boolean
58 59 60 |
# File 'lib/logstash/pipeline.rb', line 58 def started? @run_mutex.synchronize{@started} end |
#wait_filters ⇒ Object
123 124 125 |
# File 'lib/logstash/pipeline.rb', line 123 def wait_filters @filter_threads.each(&:join) if @filter_threads end |
#wait_inputs ⇒ Object
def run
108 109 110 111 112 113 114 115 116 |
# File 'lib/logstash/pipeline.rb', line 108 def wait_inputs @input_threads.each(&:join) rescue Interrupt # rbx does weird things during do SIGINT that I haven't debugged # so we catch Interrupt here and signal a shutdown. For some reason the # signal handler isn't invoked it seems? I dunno, haven't looked much into # it. shutdown end |
#wait_outputs ⇒ Object
132 133 134 135 |
# File 'lib/logstash/pipeline.rb', line 132 def wait_outputs # Wait for the outputs to stop @output_threads.each(&:join) end |