Class: LogStash::Outputs::CloudWatchLogs::Buffer

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/cloudwatchlogs.rb

Overview

This class buffers series of single item to batches and puts batches to a SizedQueue for consumption. A buffer includes an ongoing batch and an out queue. An item is added to the ongoing batch first, when the ongoing batch becomes to ready for consumption and then is added to out queue/emptified. An ongoing batch becomes to comsumption ready if the number of items is going to exceed max_batch_count, or the size of items is going to exceed max_batch_size, with the addition of one more item, or the batch has opend more than buffer_duration milliseconds and has at least one item.

Constant Summary collapse

CLOSE_BATCH =
:close

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Buffer

Creates a new buffer



298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 298

def initialize(options = {})
  @max_batch_count = options.fetch(:max_batch_count)
  @max_batch_size = options.fetch(:max_batch_size)
  @buffer_duration = options.fetch(:buffer_duration)
  @out_queue_size = options.fetch(:out_queue_size, 10)
  @logger = options.fetch(:logger, nil)
  @size_of_item_proc = options.fetch(:size_of_item_proc)
  @in_batch = Array.new
  @in_count = 0
  @in_size = 0
  @out_queue = SizedQueue.new(@out_queue_size)
  @batch_update_mutex = Mutex.new
  @last_batch_time = Time.now
  if @buffer_duration > 0
    @scheduled_batcher = Thread.new do
      loop do
        sleep(@buffer_duration / 1000.0)
        enq(:scheduled)
      end
    end
  end
end

Instance Attribute Details

#in_batchObject (readonly)

Returns the value of attribute in_batch.



295
296
297
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 295

def in_batch
  @in_batch
end

#in_countObject (readonly)

Returns the value of attribute in_count.



295
296
297
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 295

def in_count
  @in_count
end

#in_sizeObject (readonly)

Returns the value of attribute in_size.



295
296
297
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 295

def in_size
  @in_size
end

#out_queueObject (readonly)

Returns the value of attribute out_queue.



295
296
297
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 295

def out_queue
  @out_queue
end

Instance Method Details

#closeObject

Closes the buffer

Adds current batch to the queue and adds CLOSE_BATCH to queue. Waits until consumer completes.



347
348
349
350
351
352
353
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 347

def close
  while @in_size != 0 do
    enq(:close)
    sleep(1)
  end
  @out_queue.enq(CLOSE_BATCH)
end

#deq(&proc) ⇒ Object

Deques ready for consumption batches

The caller blocks on this call until the buffer is closed.



358
359
360
361
362
363
364
365
366
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 358

def deq(&proc)
  loop do
    batch = @out_queue.deq
    if batch == CLOSE_BATCH
      break
    end
    proc.call(batch)
  end
end

#enq(item) ⇒ Object

Enques an item to buffer

  • If ongoing batch is not full with this addition, adds item to batch.

  • If ongoing batch is full with this addition, adds item to batch and add batch to out queue.

  • If ongoing batch is going to overflow with this addition, adds batch to out queue,

and then adds the item to the new batch



327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/logstash/outputs/cloudwatchlogs.rb', line 327

def enq(item)
  @batch_update_mutex.synchronize do
    if item == :scheduled || item == :close
      add_current_batch_to_out_queue(item)
      return
    end
    status = try_add_item(item)
    if status != 0
      add_current_batch_to_out_queue(:add)
      if status == -1
        try_add_item(item)
      end
    end
  end
end