Class: Fluent::Plugin::LogIntelligenceOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::LogIntelligenceOutput
- Defined in:
- lib/fluent/plugin/out_vmware_log_intelligence.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #create_lint_event(record) ⇒ Object
- #flatten_record(record, prefix = []) ⇒ Object
-
#initialize ⇒ LogIntelligenceOutput
constructor
A new instance of LogIntelligenceOutput.
- #multi_workers_ready? ⇒ Boolean
- #retrieve_headers(conf) ⇒ Object
- #set_gzip_header(element) ⇒ Object
- #shorten_key(key) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #validate_uri(uri_string) ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ LogIntelligenceOutput
Returns a new instance of LogIntelligenceOutput.
37 38 39 40 41 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 37 def initialize super require 'http' require 'uri' end |
Instance Method Details
#configure(conf) ⇒ Object
153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 153 def configure(conf) super validate_uri(@endpoint_url) @statuses = @http_retry_statuses.split(',').map { |status| status.to_i } @statuses = [] if @statuses.nil? @headers = retrieve_headers(conf) @http_client = Fluent::Plugin::HttpClient.new( @endpoint_url, @verify_ssl, @headers, @statuses, @open_timeout, @read_timeout, @log) end |
#create_lint_event(record) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 83 def create_lint_event(record) flattened_records = {} merged_records = {} if @flatten_hashes flattened_records = flatten_record(record, []) else flattened_records = record end keys = [] log = '' flattened_records.each do |key, value| begin next if value.nil? # LINT doesn't support duplicate fields, make unique names by appending underscore key = shorten_key(key) if keys.include?(key) value = merged_records[key] + " " + value end keys.push(key) key.force_encoding("utf-8") if value.is_a?(String) value.force_encoding("utf-8") end end if @log_text_keys.include?(key) if log != "#{value}" if log.empty? log = "#{value}" else log += " #{value}" end end else merged_records[key] = value end end merged_records["text"] = log if log == "\\n" {} else merged_records end end |
#flatten_record(record, prefix = []) ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 131 def flatten_record(record, prefix=[]) ret = {} case record when Hash record.each do |key, value| if @log_text_keys.include?(key) ret.merge!({key.to_s => value}) else ret.merge! flatten_record(value, prefix + [key.to_s]) end end when Array record.each do |value| ret.merge! flatten_record(value, prefix) end else return {prefix.join(@flatten_hashes_separator) => record} end ret end |
#multi_workers_ready? ⇒ Boolean
167 168 169 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 167 def multi_workers_ready? true end |
#retrieve_headers(conf) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 55 def retrieve_headers(conf) headers = {} conf.elements.each do |element| if @http_compress set_gzip_header(element) end if element.name == 'headers' if @bearer_token != '' element['Authorization'] = 'Bearer ' + @bearer_token end headers = element.to_hash end end headers end |
#set_gzip_header(element) ⇒ Object
71 72 73 74 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 71 def set_gzip_header(element) element['Content-Encoding'] = 'gzip' element end |
#shorten_key(key) ⇒ Object
76 77 78 79 80 81 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 76 def shorten_key(key) # LINT doesn't allow some characters in field 'name' # like '/', '-', '\', '.', etc. so replace them with @flatten_hashes_separator key = key.gsub(/[\/\.\-\\]/,@flatten_hashes_separator).downcase key end |
#shutdown ⇒ Object
175 176 177 178 179 180 181 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 175 def shutdown super begin @http_client.close if @http_client rescue end end |
#start ⇒ Object
171 172 173 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 171 def start super end |
#validate_uri(uri_string) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 43 def validate_uri(uri_string) unless uri_string =~ /^#{URI.regexp}$/ fail Fluent::ConfigError, 'endpoint_url invalid' end begin @uri = URI.parse(uri_string) rescue URI::InvalidURIError raise Fluent::ConfigError, 'endpoint_url invalid' end end |
#write(chunk) ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/fluent/plugin/out_vmware_log_intelligence.rb', line 183 def write(chunk) is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?) if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec) @log.info('Dropped request due to rate limiting') return end data = [] chunk.each do |time, record| data << create_lint_event(record) end if @http_compress gzip_body = Zlib::GzipWriter.new(StringIO.new) gzip_body << data.to_json @http_client.post(gzip_body.close.string) else @last_request_time = Time.now.to_f @http_client.post(JSON.dump(data)) end end |