Class: LogStash::Outputs::SumoLogic::Piler
- Inherits:
-
Object
- Object
- LogStash::Outputs::SumoLogic::Piler
- Includes:
- Common
- Defined in:
- lib/logstash/outputs/sumologic/piler.rb
Constant Summary
Constants included from Common
Common::CARBON2, Common::CATEGORY_HEADER, Common::CATEGORY_HEADER_DEFAULT, Common::CLIENT_HEADER, Common::CLIENT_HEADER_VALUE, Common::CONTENT_ENCODING, Common::CONTENT_TYPE, Common::CONTENT_TYPE_CARBON2, Common::CONTENT_TYPE_GRAPHITE, Common::CONTENT_TYPE_LOG, Common::DEFAULT_LOG_FORMAT, Common::DEFLATE, Common::GRAPHITE, Common::GZIP, Common::HOST_HEADER, Common::LOG_TO_CONSOLE, Common::METRICS_NAME_PLACEHOLDER, Common::NAME_HEADER, Common::NAME_HEADER_DEFAULT, Common::STATS_TAG, Common::STOP_TAG
Instance Attribute Summary collapse
-
#is_pile ⇒ Object
readonly
Returns the value of attribute is_pile.
Instance Method Summary collapse
-
#initialize(queue, stats, config) ⇒ Piler
constructor
A new instance of Piler.
-
#input(event) ⇒ Object
def stop.
-
#start ⇒ Object
def initialize.
-
#stop ⇒ Object
def start.
Methods included from Common
#log_dbg, #log_err, #log_info, #log_warn, #set_logger
Constructor Details
#initialize(queue, stats, config) ⇒ Piler
Returns a new instance of Piler.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/logstash/outputs/sumologic/piler.rb', line 13 def initialize(queue, stats, config) @interval = config["interval"] ||= 0 @pile_max = config["pile_max"] ||= 0 @queue = queue @stats = stats @stopping = Concurrent::AtomicBoolean.new(false) @payload_builder = PayloadBuilder.new(@stats, config) @header_builder = HeaderBuilder.new(config) @is_pile = (@interval > 0 && @pile_max > 0) if (@is_pile) @pile = Hash.new("") @semaphore = Mutex.new end end |
Instance Attribute Details
#is_pile ⇒ Object (readonly)
Returns the value of attribute is_pile.
11 12 13 |
# File 'lib/logstash/outputs/sumologic/piler.rb', line 11 def is_pile @is_pile end |
Instance Method Details
#input(event) ⇒ Object
def stop
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/logstash/outputs/sumologic/piler.rb', line 55 def input(event) if (@stopping.true?) log_warn("piler is shutting down, event is dropped", "event" => event) else headers = @header_builder.build(event) payload = @payload_builder.build(event) if (@is_pile) @semaphore.synchronize { content = @pile[headers] size = content.bytesize if size + payload.bytesize > @pile_max @queue.enq(Batch.new(headers, content)) @pile[headers] = "" end @pile[headers] = @pile[headers].blank? ? payload : "#{@pile[headers]}\n#{payload}" } else @queue.enq(Batch.new(headers, payload)) end # if end end |
#start ⇒ Object
def initialize
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/logstash/outputs/sumologic/piler.rb', line 30 def start() @stopping.make_false() if (@is_pile) log_info("starting piler...", :max => @pile_max, :timeout => @interval) @piler_t = Thread.new { while @stopping.false? Stud.stoppable_sleep(@interval) { @stopping.true? } log_dbg("timeout", :timeout => @interval) enq_and_clear() end # while } end # if end |
#stop ⇒ Object
def start
46 47 48 49 50 51 52 53 |
# File 'lib/logstash/outputs/sumologic/piler.rb', line 46 def stop() @stopping.make_true() if (@is_pile) log_info("shutting down piler in #{@interval * 2} secs ...") @piler_t.join(@interval * 2) log_info("piler is fully shutted down") end end |