Class: LogStash::Outputs::SumoLogic::MessageQueue

Inherits:
Object
  • Object
show all
Includes:
Common
Defined in:
lib/logstash/outputs/sumologic/message_queue.rb

Constant Summary

Constants included from Common

Common::CARBON2, Common::CATEGORY_HEADER, Common::CATEGORY_HEADER_DEFAULT, Common::CLIENT_HEADER, Common::CLIENT_HEADER_VALUE, Common::CONTENT_ENCODING, Common::CONTENT_TYPE, Common::CONTENT_TYPE_CARBON2, Common::CONTENT_TYPE_GRAPHITE, Common::CONTENT_TYPE_LOG, Common::DEFAULT_LOG_FORMAT, Common::DEFLATE, Common::GRAPHITE, Common::GZIP, Common::HOST_HEADER, Common::LOG_TO_CONSOLE, Common::METRICS_NAME_PLACEHOLDER, Common::NAME_HEADER, Common::NAME_HEADER_DEFAULT, Common::STATS_TAG, Common::STOP_TAG

Instance Method Summary collapse

Methods included from Common

#log_dbg, #log_err, #log_info, #log_warn, #set_logger

Constructor Details

#initialize(stats, config) ⇒ MessageQueue

Returns a new instance of MessageQueue.



9
10
11
12
13
14
15
# File 'lib/logstash/outputs/sumologic/message_queue.rb', line 9

def initialize(stats, config)
  @queue_max = (config["queue_max"] ||= 1) < 1 ? 1 : config["queue_max"]
  @queue = SizedQueue::new(@queue_max)
  log_info("initialize memory queue", :max => @queue_max)
  @queue_bytesize = Concurrent::AtomicFixnum.new
  @stats = stats
end

Instance Method Details

#bytesizeObject

size



52
53
54
# File 'lib/logstash/outputs/sumologic/message_queue.rb', line 52

def bytesize()
  @queue_bytesize.value
end

#deqObject

def enq



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/logstash/outputs/sumologic/message_queue.rb', line 30

def deq()
  batch = @queue.deq()
  batch_size = batch.payload.bytesize
  @stats.record_deque(batch_size)
  @queue_bytesize.update { |v| v - batch_size }
  log_dbg("dequeue",
    :objects_in_queue => size,
    :bytes_in_queue => @queue_bytesize,
    :size => batch_size)
  batch
end

#drainObject

def deq



42
43
44
45
46
# File 'lib/logstash/outputs/sumologic/message_queue.rb', line 42

def drain()
  @queue.size.times.map {
    deq()
  }
end

#enq(batch) ⇒ Object

def initialize



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/logstash/outputs/sumologic/message_queue.rb', line 17

def enq(batch)
  batch_size = batch.payload.bytesize
  if (batch_size > 0)
    @queue.enq(batch)
    @stats.record_enque(batch_size)
    @queue_bytesize.update { |v| v + batch_size }
    log_dbg("enqueue",
      :objects_in_queue => size,
      :bytes_in_queue => @queue_bytesize,
      :size => batch_size)
    end
end

#sizeObject

def drain



48
49
50
# File 'lib/logstash/outputs/sumologic/message_queue.rb', line 48

def size()
  @queue.size()
end