Class: LogStash::Pipeline
- Inherits:
-
Object
- Object
- LogStash::Pipeline
- Defined in:
- lib/logstash/pipeline.rb
Instance Method Summary collapse
- #configure(setting, value) ⇒ Object
- #filter(event, &block) ⇒ Object
- #filter_flusher ⇒ Object
- #filters? ⇒ Boolean
-
#filterworker ⇒ Object
def inputworker.
-
#initialize(configstr) ⇒ Pipeline
constructor
A new instance of Pipeline.
- #inputworker(plugin) ⇒ Object
- #output(event) ⇒ Object
-
#outputworker ⇒ Object
def filterworker.
-
#plugin(plugin_type, name, *args) ⇒ Object
def shutdown.
-
#ready? ⇒ Boolean
def initialize.
- #run ⇒ Object
-
#shutdown ⇒ Object
Shutdown this pipeline.
- #shutdown_filters ⇒ Object
- #shutdown_outputs ⇒ Object
- #start_filters ⇒ Object
- #start_input(plugin) ⇒ Object
- #start_inputs ⇒ Object
- #start_outputs ⇒ Object
- #started? ⇒ Boolean
- #wait_filters ⇒ Object
-
#wait_inputs ⇒ Object
def run.
- #wait_outputs ⇒ Object
Constructor Details
#initialize(configstr) ⇒ Pipeline
Returns a new instance of Pipeline.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/logstash/pipeline.rb', line 12 def initialize(configstr) @logger = Cabin::Channel.get(LogStash) grammar = LogStashConfigParser.new @config = grammar.parse(configstr) if @config.nil? raise LogStash::ConfigurationError, grammar.failure_reason end # This will compile the config to ruby and evaluate the resulting code. # The code will initialize all the plugins and define the # filter and output methods. code = @config.compile # The config code is hard to represent as a log message... # So just print it. @logger.debug? && @logger.debug("Compiled pipeline code:\n#{code}") begin eval(code) rescue => e raise end @input_to_filter = SizedQueue.new(20) # If no filters, pipe inputs directly to outputs if !filters? @filter_to_output = @input_to_filter else @filter_to_output = SizedQueue.new(20) end @settings = { "filter-workers" => 1, } end |
Instance Method Details
#configure(setting, value) ⇒ Object
54 55 56 |
# File 'lib/logstash/pipeline.rb', line 54 def configure(setting, value) @settings[setting] = value end |
#filter(event, &block) ⇒ Object
254 255 256 |
# File 'lib/logstash/pipeline.rb', line 254 def filter(event, &block) @filter_func.call(event, &block) end |
#filter_flusher ⇒ Object
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 |
# File 'lib/logstash/pipeline.rb', line 262 def filter_flusher events = [] @filters.each do |filter| # Filter any events generated so far in this flush. events.each do |event| # TODO(sissel): watchdog on flush filtration? unless event.cancelled? filter.filter(event) end end # TODO(sissel): watchdog on flushes? if filter.respond_to?(:flush) flushed = filter.flush events += flushed if !flushed.nil? && flushed.any? end end events.each do |event| @logger.debug? and @logger.debug("Pushing flushed events", :event => event) @filter_to_output.push(event) unless event.cancelled? end end |
#filters? ⇒ Boolean
58 59 60 |
# File 'lib/logstash/pipeline.rb', line 58 def filters? return @filters.any? end |
#filterworker ⇒ Object
def inputworker
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/logstash/pipeline.rb', line 180 def filterworker LogStash::Util::set_thread_name("|worker") begin while true event = @input_to_filter.pop if event == LogStash::ShutdownSignal @input_to_filter.push(event) break end # TODO(sissel): we can avoid the extra array creation here # if we don't guarantee ordering of origin vs created events. # - origin event is one that comes in naturally to the filter worker. # - created events are emitted by filters like split or metrics events = [event] filter(event) do |newevent| events << newevent end events.each do |event| next if event.cancelled? @filter_to_output.push(event) end end rescue => e @logger.error("Exception in filterworker", "exception" => e, "backtrace" => e.backtrace) end @filters.each(&:teardown) end |
#inputworker(plugin) ⇒ Object
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/logstash/pipeline.rb', line 153 def inputworker(plugin) LogStash::Util::set_thread_name("<#{plugin.class.config_name}") begin plugin.run(@input_to_filter) rescue LogStash::ShutdownSignal return rescue => e if @logger.debug? @logger.error(I18n.t("logstash.pipeline.worker-error-debug", :plugin => plugin.inspect, :error => e.to_s, :exception => e.class, :stacktrace => e.backtrace.join("\n"))) else @logger.error(I18n.t("logstash.pipeline.worker-error", :plugin => plugin.inspect, :error => e)) end puts e.backtrace if @logger.debug? plugin.teardown sleep 1 retry end rescue LogStash::ShutdownSignal # nothing ensure plugin.teardown end |
#output(event) ⇒ Object
258 259 260 |
# File 'lib/logstash/pipeline.rb', line 258 def output(event) @output_func.call(event) end |
#outputworker ⇒ Object
def filterworker
211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/logstash/pipeline.rb', line 211 def outputworker LogStash::Util::set_thread_name(">output") @outputs.each(&:register) @outputs.each(&:worker_setup) while true event = @filter_to_output.pop break if event == LogStash::ShutdownSignal output(event) end # while true @outputs.each(&:teardown) end |
#plugin(plugin_type, name, *args) ⇒ Object
def shutdown
248 249 250 251 252 |
# File 'lib/logstash/pipeline.rb', line 248 def plugin(plugin_type, name, *args) args << {} if args.empty? klass = LogStash::Plugin.lookup(plugin_type, name) return klass.new(*args) end |
#ready? ⇒ Boolean
def initialize
46 47 48 |
# File 'lib/logstash/pipeline.rb', line 46 def ready? return @ready end |
#run ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/logstash/pipeline.rb', line 62 def run @started = true @input_threads = [] start_inputs start_filters if filters? start_outputs @ready = true @logger.info("Pipeline started") wait_inputs # In theory there's nothing to do to filters to tell them to shutdown? if filters? shutdown_filters wait_filters end shutdown_outputs wait_outputs @logger.info("Pipeline shutdown complete.") # exit code return 0 end |
#shutdown ⇒ Object
Shutdown this pipeline.
This method is intended to be called from another thread
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/logstash/pipeline.rb', line 226 def shutdown @input_threads.each do |thread| # Interrupt all inputs @logger.info("Sending shutdown signal to input thread", :thread => thread) thread.raise(LogStash::ShutdownSignal) begin thread.wakeup # in case it's in blocked IO or sleeping rescue ThreadError end # Sometimes an input is stuck in a blocking I/O # so we need to tell it to teardown directly @inputs.each do |input| input.teardown end end # No need to send the ShutdownSignal to the filters/outputs nor to wait for # the inputs to finish, because in the #run method we wait for that anyway. end |
#shutdown_filters ⇒ Object
98 99 100 |
# File 'lib/logstash/pipeline.rb', line 98 def shutdown_filters @input_to_filter.push(LogStash::ShutdownSignal) end |
#shutdown_outputs ⇒ Object
106 107 108 109 |
# File 'lib/logstash/pipeline.rb', line 106 def shutdown_outputs # nothing, filters will do this @filter_to_output.push(LogStash::ShutdownSignal) end |
#start_filters ⇒ Object
133 134 135 136 137 138 139 140 141 |
# File 'lib/logstash/pipeline.rb', line 133 def start_filters @filters.each(&:register) @filter_threads = @settings["filter-workers"].times.collect do Thread.new { filterworker } end # Set up the periodic flusher thread. @flusher_thread = Thread.new { Stud.interval(5) { filter_flusher } } end |
#start_input(plugin) ⇒ Object
149 150 151 |
# File 'lib/logstash/pipeline.rb', line 149 def start_input(plugin) @input_threads << Thread.new { inputworker(plugin) } end |
#start_inputs ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/logstash/pipeline.rb', line 116 def start_inputs moreinputs = [] @inputs.each do |input| if input.threadable && input.threads > 1 (input.threads-1).times do |i| moreinputs << input.clone end end end @inputs += moreinputs @inputs.each do |input| input.register start_input(input) end end |
#start_outputs ⇒ Object
143 144 145 146 147 |
# File 'lib/logstash/pipeline.rb', line 143 def start_outputs @output_threads = [ Thread.new { outputworker } ] end |
#started? ⇒ Boolean
50 51 52 |
# File 'lib/logstash/pipeline.rb', line 50 def started? return @started end |
#wait_filters ⇒ Object
102 103 104 |
# File 'lib/logstash/pipeline.rb', line 102 def wait_filters @filter_threads.each(&:join) if @filter_threads end |
#wait_inputs ⇒ Object
def run
88 89 90 91 92 93 94 95 96 |
# File 'lib/logstash/pipeline.rb', line 88 def wait_inputs @input_threads.each(&:join) rescue Interrupt # rbx does weird things during do SIGINT that I haven't debugged # so we catch Interrupt here and signal a shutdown. For some reason the # signal handler isn't invoked it seems? I dunno, haven't looked much into # it. shutdown end |
#wait_outputs ⇒ Object
111 112 113 114 |
# File 'lib/logstash/pipeline.rb', line 111 def wait_outputs # Wait for the outputs to stop @output_threads.each(&:join) end |