Class: LogStash::Outputs::SumoLogic::MessageQueue
- Inherits:
-
Object
- Object
- LogStash::Outputs::SumoLogic::MessageQueue
- Includes:
- Common
- Defined in:
- lib/logstash/outputs/sumologic/message_queue.rb
Constant Summary
Constants included from Common
Common::CARBON2, Common::CATEGORY_HEADER, Common::CATEGORY_HEADER_DEFAULT, Common::CLIENT_HEADER, Common::CLIENT_HEADER_VALUE, Common::CONTENT_ENCODING, Common::CONTENT_TYPE, Common::CONTENT_TYPE_CARBON2, Common::CONTENT_TYPE_GRAPHITE, Common::CONTENT_TYPE_LOG, Common::DEFAULT_LOG_FORMAT, Common::DEFLATE, Common::GRAPHITE, Common::GZIP, Common::HOST_HEADER, Common::LOG_TO_CONSOLE, Common::METRICS_NAME_PLACEHOLDER, Common::NAME_HEADER, Common::NAME_HEADER_DEFAULT, Common::STATS_TAG, Common::STOP_TAG
Instance Method Summary collapse
-
#bytesize ⇒ Object
size.
-
#deq ⇒ Object
def enq.
-
#drain ⇒ Object
def deq.
-
#enq(batch) ⇒ Object
def initialize.
-
#initialize(stats, config) ⇒ MessageQueue
constructor
A new instance of MessageQueue.
-
#size ⇒ Object
def drain.
Methods included from Common
#log_dbg, #log_err, #log_info, #log_warn, #set_logger
Constructor Details
#initialize(stats, config) ⇒ MessageQueue
Returns a new instance of MessageQueue.
9 10 11 12 13 14 15 |
# File 'lib/logstash/outputs/sumologic/message_queue.rb', line 9 def initialize(stats, config) @queue_max = (config["queue_max"] ||= 1) < 1 ? 1 : config["queue_max"] @queue = SizedQueue::new(@queue_max) log_info("initialize memory queue", :max => @queue_max) @queue_bytesize = Concurrent::AtomicFixnum.new @stats = stats end |
Instance Method Details
#bytesize ⇒ Object
size
52 53 54 |
# File 'lib/logstash/outputs/sumologic/message_queue.rb', line 52 def bytesize() @queue_bytesize.value end |
#deq ⇒ Object
def enq
30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/logstash/outputs/sumologic/message_queue.rb', line 30 def deq() batch = @queue.deq() batch_size = batch.payload.bytesize @stats.record_deque(batch_size) @queue_bytesize.update { |v| v - batch_size } log_dbg("dequeue", :objects_in_queue => size, :bytes_in_queue => @queue_bytesize, :size => batch_size) batch end |
#drain ⇒ Object
def deq
42 43 44 45 46 |
# File 'lib/logstash/outputs/sumologic/message_queue.rb', line 42 def drain() @queue.size.times.map { deq() } end |
#enq(batch) ⇒ Object
def initialize
17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/logstash/outputs/sumologic/message_queue.rb', line 17 def enq(batch) batch_size = batch.payload.bytesize if (batch_size > 0) @queue.enq(batch) @stats.record_enque(batch_size) @queue_bytesize.update { |v| v + batch_size } log_dbg("enqueue", :objects_in_queue => size, :bytes_in_queue => @queue_bytesize, :size => batch_size) end end |
#size ⇒ Object
def drain
48 49 50 |
# File 'lib/logstash/outputs/sumologic/message_queue.rb', line 48 def size() @queue.size() end |