Class: LogStash::Outputs::MicrosoftSentinelOutputInternal::LogStashCompressedStream

Inherits:
LogStashEventsBatcher
  • Object
show all
Includes:
CustomSizeBasedBuffer
Defined in:
lib/logstash/sentinel_la/logStashCompressedStream.rb

Instance Method Summary collapse

Methods included from CustomSizeBasedBuffer

#buffer_full?, #buffer_initialize, #buffer_receive

Methods inherited from LogStashEventsBatcher

#batch_event_document

Constructor Details

#initialize(logstashLoganalyticsConfiguration) ⇒ LogStashCompressedStream

This is a basic memory based buffer with size and time bounds Events are compressed when entering the buffer.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/logstash/sentinel_la/logStashCompressedStream.rb', line 13

def initialize(logstashLoganalyticsConfiguration)
    @compression_buffer = StringIO.new
    @deflater = Zlib::Deflate.new
    
    @compression_stream_state = {
        :events_in_compression_buffer => 0,
        :original_events_size => 0,
        :last_flush => Time.now.to_i,
        # guarding the flush
        :flush_mutex => Mutex.new,
        # guarding the stream
        :insert_mutex => Mutex.new               
    }

    buffer_initialize(
        :max_items => logstashLoganalyticsConfiguration.max_items,
        :max_interval => logstashLoganalyticsConfiguration.plugin_flush_interval,
        :logger => logstashLoganalyticsConfiguration.logger,
        :flush_each => logstashLoganalyticsConfiguration.MAX_SIZE_BYTES - 1000
      )
    super
end

Instance Method Details

#batch_event(event_document) ⇒ Object

Adding an event document into the buffer



38
39
40
# File 'lib/logstash/sentinel_la/logStashCompressedStream.rb', line 38

def batch_event(event_document)        
    buffer_receive(event_document)
end

#closeObject



50
51
52
53
# File 'lib/logstash/sentinel_la/logStashCompressedStream.rb', line 50

def close
    buffer_flush(:final => true)
    flush_compression_buffer(:final => true)
end

#flush(documents, close = false) ⇒ Object

def batch_event



42
43
44
45
46
47
48
# File 'lib/logstash/sentinel_la/logStashCompressedStream.rb', line 42

def flush (documents, close=false)
    @compression_stream_state[:insert_mutex].synchronize do
        documents.each do |document|
            add_event_to_compression_buffer(document)
        end
    end
end