Class: LogStash::Outputs::SumoLogic::Piler

Inherits:
Object
  • Object
show all
Includes:
Common
Defined in:
lib/logstash/outputs/sumologic/piler.rb

Constant Summary

Constants included from Common

Common::CARBON2, Common::CATEGORY_HEADER, Common::CATEGORY_HEADER_DEFAULT, Common::CLIENT_HEADER, Common::CLIENT_HEADER_VALUE, Common::CONTENT_ENCODING, Common::CONTENT_TYPE, Common::CONTENT_TYPE_CARBON2, Common::CONTENT_TYPE_GRAPHITE, Common::CONTENT_TYPE_LOG, Common::DEFAULT_LOG_FORMAT, Common::DEFLATE, Common::GRAPHITE, Common::GZIP, Common::HOST_HEADER, Common::LOG_TO_CONSOLE, Common::METRICS_NAME_PLACEHOLDER, Common::NAME_HEADER, Common::NAME_HEADER_DEFAULT, Common::STATS_TAG, Common::STOP_TAG

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Common

#log_dbg, #log_err, #log_info, #log_warn, #set_logger

Constructor Details

#initialize(queue, stats, config) ⇒ Piler

Returns a new instance of Piler.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/logstash/outputs/sumologic/piler.rb', line 13

def initialize(queue, stats, config)
  
  @interval = config["interval"] ||= 0
  @pile_max = config["pile_max"] ||= 0
  @queue = queue
  @stats = stats
  @stopping = Concurrent::AtomicBoolean.new(false)
  @payload_builder = PayloadBuilder.new(@stats, config)
  @header_builder = HeaderBuilder.new(config)
  @is_pile = (@interval > 0 && @pile_max > 0)
  if (@is_pile)
    @pile = Hash.new("")
    @semaphore = Mutex.new
  end
  
end

Instance Attribute Details

#is_pileObject (readonly)

Returns the value of attribute is_pile.



11
12
13
# File 'lib/logstash/outputs/sumologic/piler.rb', line 11

def is_pile
  @is_pile
end

Instance Method Details

#input(event) ⇒ Object

def stop



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/logstash/outputs/sumologic/piler.rb', line 55

def input(event)
  if (@stopping.true?)
    log_warn("piler is shutting down, event is dropped", 
      "event" => event)
  else
    headers = @header_builder.build(event)
    payload = @payload_builder.build(event)
    if (@is_pile)
      @semaphore.synchronize {
        content = @pile[headers]
        size = content.bytesize
        if size + payload.bytesize > @pile_max
          @queue.enq(Batch.new(headers, content))
          @pile[headers] = ""
        end
        @pile[headers] = @pile[headers].blank? ? payload : "#{@pile[headers]}\n#{payload}"
      }
    else
      @queue.enq(Batch.new(headers, payload))
    end # if
  end
end

#startObject

def initialize



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/logstash/outputs/sumologic/piler.rb', line 30

def start()
  @stopping.make_false()
  if (@is_pile)
    log_info("starting piler...", 
      :max => @pile_max, 
      :timeout => @interval)
    @piler_t = Thread.new { 
      while @stopping.false?
        Stud.stoppable_sleep(@interval) { @stopping.true? }
        log_dbg("timeout", :timeout => @interval)
        enq_and_clear()
      end # while
    }
  end # if
end

#stopObject

def start



46
47
48
49
50
51
52
53
# File 'lib/logstash/outputs/sumologic/piler.rb', line 46

def stop()
  @stopping.make_true()
  if (@is_pile)
    log_info("shutting down piler in #{@interval * 2} secs ...")
    @piler_t.join(@interval * 2)
    log_info("piler is fully shutted down")
  end
end