Class: Fluent::Plugin::LokiOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::LokiOutput
- Defined in:
- lib/fluent/plugin/out_loki.rb
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #create_request(tag, time, record) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#format_labels(labels, record = {}) ⇒ Object
end send_request.
- #format_url(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary? ⇒ Boolean
- #handle_record(tag, time, record) ⇒ Object
- #http_opts(uri) ⇒ Object
-
#initialize ⇒ LokiOutput
constructor
A new instance of LokiOutput.
- #multi_workers_ready? ⇒ Boolean
- #prefer_buffered_processing ⇒ Object
- #process(tag, es) ⇒ Object
- #proxies ⇒ Object
- #send_request(req, uri) ⇒ Object
- #set_body(req, tag, time, record) ⇒ Object
- #set_header(req, tag, time, record) ⇒ Object
- #set_json_body(req, data) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ LokiOutput
Returns a new instance of LokiOutput.
13 14 15 |
# File 'lib/fluent/plugin/out_loki.rb', line 13 def initialize super end |
Instance Method Details
#configure(conf) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/fluent/plugin/out_loki.rb', line 55 def configure(conf) compat_parameters_convert(conf, :buffer) super @ssl_verify_mode = if @ssl_no_verify OpenSSL::SSL::VERIFY_NONE else OpenSSL::SSL::VERIFY_PEER end @ca_file = @cacert_file @last_request_time = nil raise Fluent::ConfigError, "'tag' in chunk_keys is required." if !@chunk_key_tag && @buffered end |
#create_request(tag, time, record) ⇒ Object
106 107 108 109 110 111 112 113 |
# File 'lib/fluent/plugin/out_loki.rb', line 106 def create_request(tag, time, record) url = format_url(tag, time, record) uri = URI.parse(url+"/api/prom/push") req = Net::HTTP::Post.new(uri.request_uri) set_body(req, tag, time, record) set_header(req, tag, time, record) return req, uri end |
#format(tag, time, record) ⇒ Object
195 196 197 |
# File 'lib/fluent/plugin/out_loki.rb', line 195 def format(tag, time, record) [time, record].to_msgpack end |
#format_labels(labels, record = {}) ⇒ Object
end send_request
172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/fluent/plugin/out_loki.rb', line 172 def format_labels(labels, record = {}) st = "{" labels.each do |key, val| st+= "#{key}=\"#{val}\"," end record.each do |key, val| st+= "#{key}=\"#{val}\"," unless key == "log" end st[-1]="}" return st end |
#format_url(tag, time, record) ⇒ Object
78 79 80 |
# File 'lib/fluent/plugin/out_loki.rb', line 78 def format_url(tag, time, record) @endpoint_url end |
#formatted_to_msgpack_binary? ⇒ Boolean
199 200 201 |
# File 'lib/fluent/plugin/out_loki.rb', line 199 def formatted_to_msgpack_binary? true end |
#handle_record(tag, time, record) ⇒ Object
183 184 185 186 187 188 189 |
# File 'lib/fluent/plugin/out_loki.rb', line 183 def handle_record(tag, time, record) rec = {"streams"=>[{"labels"=>format_labels(@labels, record), "entries"=>[{"ts"=>Time.now.iso8601(3), "line"=>record["log"]}]}]} # I used time now instead of at 'time' because it cause 'Entry out of order' on loki's side log.debug(rec.to_json) req, uri = create_request(tag, time, rec) send_request(req, uri) end |
#http_opts(uri) ⇒ Object
115 116 117 118 119 120 121 122 |
# File 'lib/fluent/plugin/out_loki.rb', line 115 def http_opts(uri) opts = { :use_ssl => uri.scheme == 'https' } opts[:verify_mode] = @ssl_verify_mode if opts[:use_ssl] opts[:ca_file] = File.join(@ca_file) if File.file?(@ca_file) opts end |
#multi_workers_ready? ⇒ Boolean
203 204 205 |
# File 'lib/fluent/plugin/out_loki.rb', line 203 def multi_workers_ready? true end |
#prefer_buffered_processing ⇒ Object
191 192 193 |
# File 'lib/fluent/plugin/out_loki.rb', line 191 def prefer_buffered_processing @buffered end |
#process(tag, es) ⇒ Object
207 208 209 210 211 |
# File 'lib/fluent/plugin/out_loki.rb', line 207 def process(tag, es) es.each do |time, record| handle_record(tag, time, record) end end |
#proxies ⇒ Object
124 125 126 |
# File 'lib/fluent/plugin/out_loki.rb', line 124 def proxies ENV['HTTPS_PROXY'] || ENV['HTTP_PROXY'] || ENV['http_proxy'] || ENV['https_proxy'] end |
#send_request(req, uri) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/fluent/plugin/out_loki.rb', line 128 def send_request(req, uri) #is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?) #if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec) #log.info('Dropped request due to rate limiting') #return #end res = nil begin if @authentication == :basic req.basic_auth(@username, @password) elsif @authentication == :bearer req['authorization'] = "bearer #{@token}" elsif @authentication == :jwt req['authorization'] = "jwt #{@token}" end @last_request_time = Time.now.to_f if proxy = proxies proxy_uri = URI.parse(proxy) res = Net::HTTP.start(uri.host, uri.port, proxy_uri.host, proxy_uri.port, proxy_uri.user, proxy_uri.password, **http_opts(uri)) {|http| http.request(req) } else res = Net::HTTP.start(uri.host, uri.port, **http_opts(uri)) {|http| http.request(req) } end rescue => e # rescue all StandardErrors # server didn't respond log.warn "Net::HTTP.#{req.method.capitalize} raises exception: #{e.class}, '#{e.}'" raise e if @raise_on_error else unless res and res.is_a?(Net::HTTPSuccess) res_summary = if res "#{res.code} #{res.} #{res.body}" else "res=nil" end log.warn "failed to #{req.method} #{uri} (#{res_summary})" end #end unless end # end begin end |
#set_body(req, tag, time, record) ⇒ Object
82 83 84 85 |
# File 'lib/fluent/plugin/out_loki.rb', line 82 def set_body(req, tag, time, record) set_json_body(req, record) req end |
#set_header(req, tag, time, record) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/fluent/plugin/out_loki.rb', line 87 def set_header(req, tag, time, record) if @tenant req["X-Scope-OrgID"] = @tenant end if @custom_headers @custom_headers.each do |k,v| req[k] = v end req else req end end |
#set_json_body(req, data) ⇒ Object
101 102 103 104 |
# File 'lib/fluent/plugin/out_loki.rb', line 101 def set_json_body(req, data) req.body = Yajl.dump(data) req['Content-Type'] = 'application/json' end |
#shutdown ⇒ Object
74 75 76 |
# File 'lib/fluent/plugin/out_loki.rb', line 74 def shutdown super end |
#start ⇒ Object
70 71 72 |
# File 'lib/fluent/plugin/out_loki.rb', line 70 def start super end |
#write(chunk) ⇒ Object
213 214 215 216 217 218 219 |
# File 'lib/fluent/plugin/out_loki.rb', line 213 def write(chunk) tag = chunk..tag @endpoint_url = extract_placeholders(@endpoint_url, chunk.) chunk.msgpack_each do |time, record| handle_record(tag, time, record) end end |