Class: LogStash::Codecs::Multiline

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::EventSupport::EventFactoryAdapter
Defined in:
lib/logstash/codecs/multiline.rb

Instance Method Summary collapse

Constructor Details

#initialize(*params) ⇒ Multiline

Returns a new instance of Multiline.



151
152
153
154
155
# File 'lib/logstash/codecs/multiline.rb', line 151

def initialize(*params)
  super

  @original_field = ecs_select[disabled: nil, v1: '[event][original]']
end

Instance Method Details

#accept(listener) ⇒ Object



200
201
202
203
204
205
206
207
# File 'lib/logstash/codecs/multiline.rb', line 200

def accept(listener)
  # memoize references to listener that holds upstream state
  @previous_listener = @last_seen_listener || listener
  @last_seen_listener = listener
  decode(listener.data) do |event|
    what_based_listener.process_event(event)
  end
end

#auto_flush(listener = @last_seen_listener) ⇒ Object



244
245
246
247
248
249
250
# File 'lib/logstash/codecs/multiline.rb', line 244

def auto_flush(listener = @last_seen_listener)
  return if listener.nil?

  flush do |event|
    listener.process_event(event)
  end
end

#auto_flush_active?Boolean

Returns:

  • (Boolean)


308
309
310
# File 'lib/logstash/codecs/multiline.rb', line 308

def auto_flush_active?
  !@auto_flush_interval.nil?
end

#auto_flush_runnerObject



312
313
314
# File 'lib/logstash/codecs/multiline.rb', line 312

def auto_flush_runner
  @auto_flush_runner || AutoFlushUnset.new(nil, nil)
end

#buffer(text) ⇒ Object

def decode



222
223
224
225
# File 'lib/logstash/codecs/multiline.rb', line 222

def buffer(text)
  @buffer_bytes += text.bytesize
  @buffer.push(text)
end

#buffer_over_limits?Boolean

Returns:

  • (Boolean)


295
296
297
# File 'lib/logstash/codecs/multiline.rb', line 295

def buffer_over_limits?
  over_maximum_lines? || over_maximum_bytes?
end

#closeObject

def encode



304
305
306
# File 'lib/logstash/codecs/multiline.rb', line 304

def close
  auto_flush_runner.stop
end

#decode(text, &block) ⇒ Object



209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/logstash/codecs/multiline.rb', line 209

def decode(text, &block)
  text = @converter.convert(text)
  text.split("\n").each do |line|
    match = @grok.match(line)
    @logger.debug? && @logger.debug("Multiline", :text => line, :pattern => @pattern,
                                    :match => (match != false), :negate => @negate)

    # Add negate option
    match = (match and !@negate) || (!match and @negate)
    @handler.call(line, match, &block)
  end
end

#do_next(text, matched, &block) ⇒ Object



275
276
277
278
279
# File 'lib/logstash/codecs/multiline.rb', line 275

def do_next(text, matched, &block)
  buffer(text)
  auto_flush_runner.start
  flush(&block) if !matched || buffer_over_limits?
end

#do_previous(text, matched, &block) ⇒ Object



281
282
283
284
285
# File 'lib/logstash/codecs/multiline.rb', line 281

def do_previous(text, matched, &block)
  flush(&block) if !matched || buffer_over_limits?
  auto_flush_runner.start
  buffer(text)
end

#doing_previous?Boolean

Returns:

  • (Boolean)


267
268
269
# File 'lib/logstash/codecs/multiline.rb', line 267

def doing_previous?
  @what == "previous"
end

#encode(event) ⇒ Object



299
300
301
302
# File 'lib/logstash/codecs/multiline.rb', line 299

def encode(event)
  # Nothing to do.
  @on_event.call(event, event)
end

#flush(&block) ⇒ Object



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/logstash/codecs/multiline.rb', line 227

def flush(&block)
  if block_given? && @buffer.any?
    no_error = true
    event = merge_events
    begin
      yield event
    rescue ::Exception => e
      # need to rescue everything
      # likliest cause: backpressure or timeout by exception
      # can't really do anything but leave the data in the buffer for next time if there is one
      @logger.error("Multiline: flush downstream error", :exception => e)
      no_error = false
    end
    reset_buffer if no_error
  end
end

#initialize_copy(source) ⇒ Object



316
317
318
319
# File 'lib/logstash/codecs/multiline.rb', line 316

def initialize_copy(source)
  super
  register
end

#merge_eventsObject



252
253
254
255
256
257
258
259
260
# File 'lib/logstash/codecs/multiline.rb', line 252

def merge_events
  message = @buffer.join(NL)
  event = event_factory.new_event(LogStash::Event::TIMESTAMP => @time, "message" => message)
  event.set @original_field, message.dup.freeze if @original_field
  event.tag @multiline_tag if !@multiline_tag.empty? && @buffer.size > 1
  event.tag "multiline_codec_max_bytes_reached" if over_maximum_bytes?
  event.tag "multiline_codec_max_lines_reached" if over_maximum_lines?
  event
end

#over_maximum_bytes?Boolean

Returns:

  • (Boolean)


291
292
293
# File 'lib/logstash/codecs/multiline.rb', line 291

def over_maximum_bytes?
  @buffer_bytes >= @max_bytes
end

#over_maximum_lines?Boolean

Returns:

  • (Boolean)


287
288
289
# File 'lib/logstash/codecs/multiline.rb', line 287

def over_maximum_lines?
  @buffer.size > @max_lines
end

#registerObject



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/logstash/codecs/multiline.rb', line 157

def register
  require "grok-pure" # rubygem 'jls-grok'
  require 'logstash/patterns/core'

  # Detect if we are running from a jarfile, pick the right path.
  patterns_path = []
  patterns_path += [LogStash::Patterns::Core.path]

  @grok = Grok.new

  @patterns_dir = patterns_path.to_a + @patterns_dir
  @patterns_dir.each do |path|
    if ::File.directory?(path)
      path = ::File.join(path, "*")
    end

    Dir.glob(path).each do |file|
      @logger.debug("Grok loading patterns from file", :path => file)
      @grok.add_patterns_from_file(file)
    end
  end

  @grok.compile(@pattern)
  @logger.trace("Registered multiline plugin", :type => @type, :config => @config)

  reset_buffer

  @handler = method("do_#{@what}".to_sym)

  @converter = LogStash::Util::Charset.new(@charset)
  @converter.logger = @logger
  if @auto_flush_interval
    # will start on first decode
    @auto_flush_runner = AutoFlush.new(self, @auto_flush_interval)
  end
end

#reset_bufferObject



262
263
264
265
# File 'lib/logstash/codecs/multiline.rb', line 262

def reset_buffer
  @buffer = []
  @buffer_bytes = 0
end

#use_mapper_auto_flushObject

def register



194
195
196
197
198
# File 'lib/logstash/codecs/multiline.rb', line 194

def use_mapper_auto_flush
  return unless auto_flush_active?
  @auto_flush_runner = AutoFlushUnset.new(nil, nil)
  @auto_flush_interval = @auto_flush_interval.to_f
end

#what_based_listenerObject



271
272
273
# File 'lib/logstash/codecs/multiline.rb', line 271

def what_based_listener
  doing_previous? ? @previous_listener : @last_seen_listener
end