Class: Fluent::Plugin::LogIntelligenceOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::LogIntelligenceOutput
- Defined in:
- lib/fluent/plugin/out_vmware_log_intelligence.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #create_lint_event(record) ⇒ Object
- #flatten_record(record, prefix = []) ⇒ Object
-
#initialize ⇒ LogIntelligenceOutput
constructor
A new instance of LogIntelligenceOutput.
- #multi_workers_ready? ⇒ Boolean
- #retrieve_headers(conf) ⇒ Object
- #set_gzip_header(element) ⇒ Object
- #shorten_key(key) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #validate_uri(uri_string) ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ LogIntelligenceOutput
Returns a new instance of LogIntelligenceOutput.
37 38 39 40 41 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 37 def initialize super require 'http' require 'uri' end |
Instance Method Details
#configure(conf) ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 155 def configure(conf) super validate_uri(@endpoint_url) @statuses = @http_retry_statuses.split(',').map { |status| status.to_i } @statuses = [] if @statuses.nil? @headers = retrieve_headers(conf) @http_client = Fluent::Plugin::HttpClient.new( @endpoint_url, @verify_ssl, @headers, @statuses, @open_timeout, @read_timeout, @log) end |
#create_lint_event(record) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 84 def create_lint_event(record) flattened_records = {} merged_records = {} if @flatten_hashes flattened_records = flatten_record(record, []) else flattened_records = record end keys = [] log = '' flattened_records.each do |key, value| begin next if value.nil? # LINT doesn't support duplicate fields, make unique names by appending underscore key = shorten_key(key) if keys.include?(key) value = merged_records[key] + " " + value end keys.push(key) key.force_encoding("utf-8") if value.is_a?(String) @log.debug "VMware Log Intelligence force encoding" value.force_encoding("utf-8") end end if @log_text_keys.include?(key) if log != "#{value}" if log.empty? log = "#{value}" else log += " #{value}" end end else merged_records[key] = value end end merged_records["text"] = log if log == "\\n" {} else merged_records end end |
#flatten_record(record, prefix = []) ⇒ Object
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 133 def flatten_record(record, prefix=[]) ret = {} @log.debug "VMware Log Intelligence flattening record" case record when Hash record.each do |key, value| if @log_text_keys.include?(key) ret.merge!({key.to_s => value}) else ret.merge! flatten_record(value, prefix + [key.to_s]) end end when Array record.each do |value| ret.merge! flatten_record(value, prefix) end else return {prefix.join(@flatten_hashes_separator) => record} end ret end |
#multi_workers_ready? ⇒ Boolean
169 170 171 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 169 def multi_workers_ready? true end |
#retrieve_headers(conf) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 55 def retrieve_headers(conf) headers = {} conf.elements.each do |element| if @http_compress @log.debug "VMware Log Intelligence Compression enabled" set_gzip_header(element) end if element.name == 'headers' if @bearer_token != '' element['Authorization'] = 'Bearer ' + @bearer_token end headers = element.to_hash end end headers end |
#set_gzip_header(element) ⇒ Object
72 73 74 75 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 72 def set_gzip_header(element) element['Content-Encoding'] = 'gzip' element end |
#shorten_key(key) ⇒ Object
77 78 79 80 81 82 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 77 def shorten_key(key) # LINT doesn't allow some characters in field 'name' # like '/', '-', '\', '.', etc. so replace them with @flatten_hashes_separator key = key.gsub(/[\/\.\-\\]/,@flatten_hashes_separator).downcase key end |
#shutdown ⇒ Object
178 179 180 181 182 183 184 185 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 178 def shutdown super @log.debug "Shutting Down VMware Log Intelligence Shipper.." begin @http_client.close if @http_client rescue end end |
#start ⇒ Object
173 174 175 176 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 173 def start super @log.debug "Started VMware Log Intelligence Shipper.." end |
#validate_uri(uri_string) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 43 def validate_uri(uri_string) unless uri_string =~ /^#{URI::DEFAULT_PARSER.make_regexp}$/ fail Fluent::ConfigError, 'endpoint_url invalid' end begin @uri = URI.parse(uri_string) rescue URI::InvalidURIError raise Fluent::ConfigError, 'endpoint_url invalid' end end |
#write(chunk) ⇒ Object
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 187 def write(chunk) @log.debug "VMware Log Intelligence writing message" is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?) if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec) @log.info('Dropped request due to rate limiting') return end data = [] chunk.each do |time, record| data << create_lint_event(record) end if @http_compress @log.debug "VMware Log Intelligence sending compressed message" gzip_body = Zlib::GzipWriter.new(StringIO.new) gzip_body << Yajl.dump(data) @last_request_time = Time.now.to_f @http_client.post(gzip_body.close.string) else @log.debug "VMware Log Intelligence sending uncompressed message" @last_request_time = Time.now.to_f @http_client.post(Yajl.dump(data)) end end |