Class: LogStash::Outputs::CloudWatchLogs::Buffer
- Inherits:
-
Object
- Object
- LogStash::Outputs::CloudWatchLogs::Buffer
- Defined in:
- lib/logstash/outputs/cloudwatchlogs.rb
Overview
This class buffers series of single item to batches and puts batches to a SizedQueue for consumption. A buffer includes an ongoing batch and an out queue. An item is added to the ongoing batch first, when the ongoing batch becomes to ready for consumption and then is added to out queue/emptified. An ongoing batch becomes to comsumption ready if the number of items is going to exceed max_batch_count, or the size of items is going to exceed max_batch_size, with the addition of one more item, or the batch has opend more than buffer_duration milliseconds and has at least one item.
Constant Summary collapse
- CLOSE_BATCH =
:close
Instance Attribute Summary collapse
-
#in_batch ⇒ Object
readonly
Returns the value of attribute in_batch.
-
#in_count ⇒ Object
readonly
Returns the value of attribute in_count.
-
#in_size ⇒ Object
readonly
Returns the value of attribute in_size.
-
#out_queue ⇒ Object
readonly
Returns the value of attribute out_queue.
Instance Method Summary collapse
-
#close ⇒ Object
Closes the buffer.
-
#deq(&proc) ⇒ Object
Deques ready for consumption batches.
-
#enq(item) ⇒ Object
Enques an item to buffer.
-
#initialize(options = {}) ⇒ Buffer
constructor
Creates a new buffer.
Constructor Details
#initialize(options = {}) ⇒ Buffer
Creates a new buffer
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 298 def initialize( = {}) @max_batch_count = .fetch(:max_batch_count) @max_batch_size = .fetch(:max_batch_size) @buffer_duration = .fetch(:buffer_duration) @out_queue_size = .fetch(:out_queue_size, 10) @logger = .fetch(:logger, nil) @size_of_item_proc = .fetch(:size_of_item_proc) @in_batch = Array.new @in_count = 0 @in_size = 0 @out_queue = SizedQueue.new(@out_queue_size) @batch_update_mutex = Mutex.new @last_batch_time = Time.now if @buffer_duration > 0 @scheduled_batcher = Thread.new do loop do sleep(@buffer_duration / 1000.0) enq(:scheduled) end end end end |
Instance Attribute Details
#in_batch ⇒ Object (readonly)
Returns the value of attribute in_batch.
295 296 297 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 295 def in_batch @in_batch end |
#in_count ⇒ Object (readonly)
Returns the value of attribute in_count.
295 296 297 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 295 def in_count @in_count end |
#in_size ⇒ Object (readonly)
Returns the value of attribute in_size.
295 296 297 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 295 def in_size @in_size end |
#out_queue ⇒ Object (readonly)
Returns the value of attribute out_queue.
295 296 297 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 295 def out_queue @out_queue end |
Instance Method Details
#close ⇒ Object
Closes the buffer
Adds current batch to the queue and adds CLOSE_BATCH to queue. Waits until consumer completes.
347 348 349 350 351 352 353 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 347 def close while @in_size != 0 do enq(:close) sleep(1) end @out_queue.enq(CLOSE_BATCH) end |
#deq(&proc) ⇒ Object
Deques ready for consumption batches
The caller blocks on this call until the buffer is closed.
358 359 360 361 362 363 364 365 366 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 358 def deq(&proc) loop do batch = @out_queue.deq if batch == CLOSE_BATCH break end proc.call(batch) end end |
#enq(item) ⇒ Object
Enques an item to buffer
-
If ongoing batch is not full with this addition, adds item to batch.
-
If ongoing batch is full with this addition, adds item to batch and add batch to out queue.
-
If ongoing batch is going to overflow with this addition, adds batch to out queue,
and then adds the item to the new batch
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 |
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 327 def enq(item) @batch_update_mutex.synchronize do if item == :scheduled || item == :close add_current_batch_to_out_queue(item) return end status = try_add_item(item) if status != 0 add_current_batch_to_out_queue(:add) if status == -1 try_add_item(item) end end end end |