Class: Fluent::LmOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_lm.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.



32
33
34
# File 'lib/fluent/plugin/out_lm.rb', line 32

def configure(conf)
  super
end

#format(tag, time, record) ⇒ Object

This method is called when an event reaches to Fluentd. Convert the event to a raw string.



50
51
52
# File 'lib/fluent/plugin/out_lm.rb', line 50

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#generate_token(events) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
# File 'lib/fluent/plugin/out_lm.rb', line 119

def generate_token(events)
  timestamp = DateTime.now.strftime('%Q')
  signature = Base64.strict_encode64(
      OpenSSL::HMAC.hexdigest(
          OpenSSL::Digest.new('sha256'),
          @access_key,
          "POST#{timestamp}#{events.to_json}/log/ingest"
      )
  )
  "LMv1 #{@access_id}:#{signature}:#{timestamp}"
end

#send_batch(events) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/fluent/plugin/out_lm.rb', line 89

def send_batch(events)
  url = "https://#{@company_name}.logicmonitor.com/rest/log/ingest"
  body = events.to_json
  uri = URI.parse(url)

  log.info "Sending #{events.length} events to logic monitor at #{url}"
  
  if @debug
    log.info "Request json #{body}"
  end

  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = true

  request = Net::HTTP::Post.new(uri.request_uri)
  request['authorization'] = generate_token(events)
  request['Content-type'] = "application/json"  
  request.body = body

  resp = http.request(request)
  if @debug
    log.info "Response #{resp.body}"
  end

  if !resp.kind_of? Net::HTTPSuccess
    log.error "Error sending batch #{resp.body}"
  end
end

#shutdownObject

This method is called when shutting down. Shutdown the thread and close sockets or files here.



44
45
46
# File 'lib/fluent/plugin/out_lm.rb', line 44

def shutdown
  super
end

#startObject

This method is called when starting. Open sockets or files here.



38
39
40
# File 'lib/fluent/plugin/out_lm.rb', line 38

def start
  super
end

#write(chunk) ⇒ Object

This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.

NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/fluent/plugin/out_lm.rb', line 61

def write(chunk)
  events = []
  chunk.msgpack_each do |(tag, time, record)|
    resource_map = {}
    lm_event = record

    if record["_lm.resourceId"] == nil
      @resource_mapping.each do |key, value|
        k = value
        nestedVal = record
        key.to_s.split('.').each { |x| nestedVal = nestedVal[x] }
        if nestedVal != nil
          resource_map[k] = nestedVal
        end
      end
      lm_event["_lm.resourceId"] = resource_map
    end

    lm_event["timestamp"] = Time.at(time).utc.to_datetime.rfc3339

    if @debug
      log.info "Event #{lm_event.to_json}"
    end
    events.push(lm_event)
  end
  send_batch(events)
end