Class: Fluent::LmOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::LmOutput
- Defined in:
- lib/fluent/plugin/out_lm.rb
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
This method is called before starting.
-
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd.
- #generate_token(events) ⇒ Object
- #send_batch(events) ⇒ Object
-
#shutdown ⇒ Object
This method is called when shutting down.
-
#start ⇒ Object
This method is called when starting.
-
#write(chunk) ⇒ Object
This method is called every flush interval.
Instance Method Details
#configure(conf) ⇒ Object
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
32 33 34 |
# File 'lib/fluent/plugin/out_lm.rb', line 32 def configure(conf) super end |
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd. Convert the event to a raw string.
50 51 52 |
# File 'lib/fluent/plugin/out_lm.rb', line 50 def format(tag, time, record) [tag, time, record].to_msgpack end |
#generate_token(events) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/fluent/plugin/out_lm.rb', line 119 def generate_token(events) = DateTime.now.strftime('%Q') signature = Base64.strict_encode64( OpenSSL::HMAC.hexdigest( OpenSSL::Digest.new('sha256'), @access_key, "POST#{}#{events.to_json}/log/ingest" ) ) "LMv1 #{@access_id}:#{signature}:#{}" end |
#send_batch(events) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/fluent/plugin/out_lm.rb', line 89 def send_batch(events) url = "https://#{@company_name}.logicmonitor.com/rest/log/ingest" body = events.to_json uri = URI.parse(url) log.info "Sending #{events.length} events to logic monitor at #{url}" if @debug log.info "Request json #{body}" end http = Net::HTTP.new(uri.host, uri.port) http.use_ssl = true request = Net::HTTP::Post.new(uri.request_uri) request['authorization'] = generate_token(events) request['Content-type'] = "application/json" request.body = body resp = http.request(request) if @debug log.info "Response #{resp.body}" end if !resp.kind_of? Net::HTTPSuccess log.error "Error sending batch #{resp.body}" end end |
#shutdown ⇒ Object
This method is called when shutting down. Shutdown the thread and close sockets or files here.
44 45 46 |
# File 'lib/fluent/plugin/out_lm.rb', line 44 def shutdown super end |
#start ⇒ Object
This method is called when starting. Open sockets or files here.
38 39 40 |
# File 'lib/fluent/plugin/out_lm.rb', line 38 def start super end |
#write(chunk) ⇒ Object
This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.
NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/fluent/plugin/out_lm.rb', line 61 def write(chunk) events = [] chunk.msgpack_each do |(tag, time, record)| resource_map = {} lm_event = record if record["_lm.resourceId"] == nil @resource_mapping.each do |key, value| k = value nestedVal = record key.to_s.split('.').each { |x| nestedVal = nestedVal[x] } if nestedVal != nil resource_map[k] = nestedVal end end lm_event["_lm.resourceId"] = resource_map end lm_event["timestamp"] = Time.at(time).utc.to_datetime.rfc3339 if @debug log.info "Event #{lm_event.to_json}" end events.push(lm_event) end send_batch(events) end |