Module: LogStash::Outputs::MicrosoftSentinelOutputInternal::CustomSizeBasedBuffer
- Included in:
- LogStashAutoResizeBuffer, LogStashCompressedStream
- Defined in:
- lib/logstash/sentinel_la/customSizeBasedBuffer.rb
Instance Method Summary collapse
-
#buffer_flush(options = {}) ⇒ Fixnum
Try to flush events.
-
#buffer_full? ⇒ bool
Determine if
:max_items
or:flush_each
has been reached. -
#buffer_initialize(options = {}) ⇒ Object
Initialize the buffer.
-
#buffer_receive(event, group = nil) ⇒ Object
Save an event for later delivery.
Instance Method Details
#buffer_flush(options = {}) ⇒ Fixnum
Try to flush events.
Returns immediately if flushing is not necessary/possible at the moment:
-
:max_items or :flush_each have not been accumulated
-
:max_interval seconds have not elapased since the last flush
-
another flush is in progress
buffer_flush(:force => true)
will cause a flush to occur even if :max_items
or :flush_each
or :max_interval
have not been reached. A forced flush will still return immediately (without flushing) if another flush is currently in progress.
buffer_flush(:final => true)
is identical to buffer_flush(:force => true)
, except that if another flush is already in progress, buffer_flush(:final => true)
will block/wait for the other flush to finish before proceeding.
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/logstash/sentinel_la/customSizeBasedBuffer.rb', line 189 def buffer_flush(={}) force = [:force] || [:final] final = [:final] # final flush will wait for lock, so we are sure to flush out all buffered events if [:final] @buffer_state[:flush_mutex].lock elsif ! @buffer_state[:flush_mutex].try_lock # failed to get lock, another flush already in progress return 0 end items_flushed = 0 begin return 0 if @buffer_state[:pending_count] == 0 # compute time_since_last_flush only when some item is pending time_since_last_flush = get_time_since_last_flush return 0 if (!force) && (@buffer_state[:pending_count] < @buffer_config[:max_items]) && (@buffer_config[:flush_each] == 0 || @buffer_state[:pending_size] < @buffer_config[:flush_each]) && (time_since_last_flush < @buffer_config[:max_interval]) @buffer_state[:pending_mutex].synchronize do @buffer_state[:outgoing_items] = @buffer_state[:pending_items] @buffer_state[:outgoing_count] = @buffer_state[:pending_count] @buffer_state[:outgoing_size] = @buffer_state[:pending_size] buffer_clear_pending end @buffer_config[:logger].debug("Flushing output", :outgoing_count => @buffer_state[:outgoing_count], :time_since_last_flush => time_since_last_flush, :outgoing_events => @buffer_state[:outgoing_items], :batch_timeout => @buffer_config[:max_interval], :force => force, :final => final ) if @buffer_config[:logger] @buffer_state[:outgoing_items].each do |group, events| begin if group.nil? flush(events,final) else flush(events, group, final) end @buffer_state[:outgoing_items].delete(group) events_size = events.size @buffer_state[:outgoing_count] -= events_size if @buffer_config[:flush_each] != 0 events_volume = 0 events.each do |event| events_volume += var_size(event) end @buffer_state[:outgoing_size] -= events_volume end items_flushed += events_size rescue => e @buffer_config[:logger].warn("Failed to flush outgoing items", :outgoing_count => @buffer_state[:outgoing_count], :exception => e, :backtrace => e.backtrace ) if @buffer_config[:logger] if @buffer_config[:has_on_flush_error] on_flush_error e end sleep 1 retry end @buffer_state[:last_flush] = Time.now.to_i end ensure @buffer_state[:flush_mutex].unlock end return items_flushed end |
#buffer_full? ⇒ bool
Determine if :max_items
or :flush_each
has been reached.
buffer_receive calls will block while buffer_full? == true
.
134 135 136 137 |
# File 'lib/logstash/sentinel_la/customSizeBasedBuffer.rb', line 134 def buffer_full? (@buffer_state[:pending_count] + @buffer_state[:outgoing_count] >= @buffer_config[:max_items]) || \ (@buffer_config[:flush_each] != 0 && @buffer_state[:pending_size] + @buffer_state[:outgoing_size] >= @buffer_config[:flush_each]) end |
#buffer_initialize(options = {}) ⇒ Object
Initialize the buffer.
Call directly from your constructor if you wish to set some non-default options. Otherwise buffer_initialize will be called automatically during the first buffer_receive call.
Options:
-
:max_items, Max number of items to buffer before flushing. Default 50.
-
:flush_each, Flush each bytes of buffer. Default 0 (no flushing fired by
a buffer size).
-
:max_interval, Max number of seconds to wait between flushes. Default 5.
-
:logger, A logger to write log messages to. No default. Optional.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/logstash/sentinel_la/customSizeBasedBuffer.rb', line 85 def buffer_initialize(={}) if ! self.class.method_defined?(:flush) raise ArgumentError, "Any class including Stud::Buffer must define a flush() method." end @buffer_config = { :max_items => [:max_items] || 50, :flush_each => [:flush_each].to_i || 0, :max_interval => [:max_interval] || 5, :logger => [:logger] || nil, :has_on_flush_error => self.class.method_defined?(:on_flush_error), :has_on_full_buffer_receive => self.class.method_defined?(:on_full_buffer_receive) } @buffer_state = { # items accepted from including class :pending_items => {}, :pending_count => 0, :pending_size => 0, # guard access to pending_items & pending_count & pending_size :pending_mutex => Mutex.new, # items which are currently being flushed :outgoing_items => {}, :outgoing_count => 0, :outgoing_size => 0, # ensure only 1 flush is operating at once :flush_mutex => Mutex.new, # data for timed flushes :last_flush => Time.now.to_i, :timer => Thread.new do loop do sleep(@buffer_config[:max_interval]) buffer_flush(:force => true) end end } # events we've accumulated buffer_clear_pending end |
#buffer_receive(event, group = nil) ⇒ Object
Save an event for later delivery
Events are grouped by the (optional) group parameter you provide. Groups of events, plus the group name, are later passed to flush
.
This call will block if :max_items
or :flush_each
has been reached.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/logstash/sentinel_la/customSizeBasedBuffer.rb', line 151 def buffer_receive(event, group=nil) buffer_initialize if ! @buffer_state # block if we've accumulated too many events while buffer_full? do on_full_buffer_receive( :pending => @buffer_state[:pending_count], :outgoing => @buffer_state[:outgoing_count] ) if @buffer_config[:has_on_full_buffer_receive] sleep 0.1 end @buffer_state[:pending_mutex].synchronize do @buffer_state[:pending_items][group] << event @buffer_state[:pending_count] += 1 @buffer_state[:pending_size] += var_size(event) if @buffer_config[:flush_each] != 0 end buffer_flush end |