Class: Scalyr::ScalyrOut
- Inherits:
-
Fluent::Plugin::Output
- Object
- Fluent::Plugin::Output
- Scalyr::ScalyrOut
- Defined in:
- lib/fluent/plugin/out_scalyr.rb
Instance Method Summary collapse
- #build_add_events_body(chunk) ⇒ Object
-
#compat_parameters_default_chunk_key ⇒ Object
support for version 0.14.0:.
- #configure(conf) ⇒ Object
- #create_request(events, current_threads) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary ⇒ Object
- #handle_response(response) ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #post_request(uri, body) ⇒ Object
- #start ⇒ Object
-
#to_millis(timestamp) ⇒ Object
explicit function to convert to milliseconds will make things easier to maintain if/when fluentd supports higher than second resolutions.
-
#to_nanos(seconds, nsec) ⇒ Object
explicit function to convert to nanoseconds will make things easier to maintain if/when fluentd supports higher than second resolutions.
-
#write(chunk) ⇒ Object
called by fluentd when a chunk of log messages is ready.
Instance Method Details
#build_add_events_body(chunk) ⇒ Object
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 |
# File 'lib/fluent/plugin/out_scalyr.rb', line 309 def build_add_events_body(chunk) # requests requests = [] # set of unique scalyr threads for this chunk current_threads = {} # byte count total_bytes = 0 # create a Scalyr event object for each record in the chunk events = [] chunk.msgpack_each {|(tag, sec, nsec, record)| # rubocop:disable Metrics/BlockLength = to_nanos(sec, nsec) thread_id = tag # then update the map of threads for this chunk current_threads[tag] = thread_id # add a logfile field if one doesn't exist record["logfile"] = "/fluentd/#{tag}" unless record.key? "logfile" # set per-event parser if it is configured record["parser"] = @parser unless @parser.nil? # append to list of events event = {thread: thread_id.to_s, ts: , attrs: record} # get json string of event to keep track of how many bytes we are sending begin event_json = event.to_json rescue JSON::GeneratorError, Encoding::UndefinedConversionError => e $log.warn "JSON serialization of the event failed: #{e.class}: #{e.}" # Send the faulty event to a label @ERROR block and allow to handle it there (output to exceptions file for ex) time = Fluent::EventTime.new(sec, nsec) router.emit_error_event(tag, time, record, e) # Print attribute values for debugging / troubleshooting purposes $log.debug "Event attributes:" event[:attrs].each do |key, value| # NOTE: value doesn't always value.encoding attribute so we use .class which is always available $log.debug "\t#{key} (#{value.class}): '#{value}'" end # Recursively re-encode and sanitize potentially bad string values event[:attrs] = sanitize_and_reencode_value(event[:attrs]) event_json = event.to_json end # generate new request if json size of events in the array exceed maximum request buffer size append_event = true if total_bytes + event_json.bytesize > @max_request_buffer # the case where a single event causes us to exceed the @max_request_buffer if events.empty? # if we are able to truncate the content (and append an ellipsis) # inside the @message_field we do so here if record.key?(@message_field) && record[@message_field].is_a?(String) && record[@message_field].bytesize > event_json.bytesize - @max_request_buffer && record[@message_field].bytesize >= 3 @log.warn "Received a record that cannot fit within max_request_buffer "\ "(#{@max_request_buffer}) from #{record['logfile']}, serialized event size "\ "is #{event_json.bytesize}. The #{@message_field} field will be truncated to fit." max_msg_size = @max_request_buffer - event_json.bytesize - 3 truncated_msg = event[:attrs][@message_field][0...max_msg_size] + "..." event[:attrs][@message_field] = truncated_msg events << event # otherwise we drop the event and save ourselves hitting a 4XX response from the server else @log.warn "Received a record that cannot fit within max_request_buffer "\ "(#{@max_request_buffer}) from #{record['logfile']}, serialized event size "\ "is #{event_json.bytesize}. The #{@message_field} field too short to truncate, "\ "dropping event." end append_event = false end unless events.empty? request = create_request(events, current_threads) requests << request end total_bytes = 0 current_threads = {} events = [] end # if we haven't consumed the current event already # add it to the end of our array and keep track of the json bytesize if append_event events << event total_bytes += event_json.bytesize end } # create a final request with any left over events unless events.empty? request = create_request(events, current_threads) requests << request end requests end |
#compat_parameters_default_chunk_key ⇒ Object
support for version 0.14.0:
63 64 65 |
# File 'lib/fluent/plugin/out_scalyr.rb', line 63 def compat_parameters_default_chunk_key "" end |
#configure(conf) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/fluent/plugin/out_scalyr.rb', line 75 def configure(conf) @version = if Gem.loaded_specs.key?("fluent-plugin-scalyr") Gem.loaded_specs["fluent-plugin-scalyr"].version else "unknown" end if conf.elements("buffer").empty? $log.warn "Pre 0.14.0 configuration file detected. Please consider updating your configuration file" # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective end compat_parameters_buffer(conf, default_chunk_key: "") super if @buffer.chunk_limit_size > 6_000_000 $log.warn "Buffer chunk size is greater than 6Mb. This may result in requests being rejected by Scalyr" # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective end if @max_request_buffer > 6_000_000 $log.warn "Maximum request buffer > 6Mb. This may result in requests being rejected by Scalyr" # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective end @message_encoding = nil if @force_message_encoding.to_s != "" begin @message_encoding = Encoding.find(@force_message_encoding) $log.debug "Forcing message encoding to '#{@force_message_encoding}'" rescue ArgumentError $log.warn "No encoding '#{@force_message_encoding}' found. Ignoring" end end # evaluate any statements in string value of the server_attributes object if @server_attributes new_attributes = {} @server_attributes.each do |key, value| next unless value.is_a?(String) m = /^\#{(.*)}$/.match(value) new_attributes[key] = if m eval(m[1]) # rubocop:disable Security/Eval else value end end @server_attributes = new_attributes end # See if we should use the hostname as the server_attributes.serverHost if @use_hostname_for_serverhost # ensure server_attributes is not nil @server_attributes = {} if @server_attributes.nil? # only set serverHost if it doesn't currently exist in server_attributes # Note: Use strings rather than symbols for the key, because keys coming # from the config file will be strings unless @server_attributes.key? "serverHost" @server_attributes["serverHost"] = Socket.gethostname end end @scalyr_server << "/" unless @scalyr_server.end_with?("/") @add_events_uri = URI @scalyr_server + "addEvents" num_threads = @buffer_config.flush_thread_count # forcibly limit the number of threads to 1 for now, to ensure requests always have incrementing timestamps if num_threads > 1 raise Fluent::ConfigError, "num_threads is currently limited to 1. You specified #{num_threads}." end end |
#create_request(events, current_threads) ⇒ Object
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 |
# File 'lib/fluent/plugin/out_scalyr.rb', line 421 def create_request(events, current_threads) # build the scalyr thread objects threads = [] current_threads.each do |tag, id| threads << {id: id.to_s, name: "Fluentd: #{tag}"} end current_time = to_millis(Fluent::Engine.now) body = {token: @api_write_token, client_timestamp: current_time.to_s, session: @session, events: events, threads: threads} # add server_attributes hash if it exists body[:sessionInfo] = @server_attributes if @server_attributes {body: body.to_json, record_count: events.size} end |
#format(tag, time, record) ⇒ Object
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/fluent/plugin/out_scalyr.rb', line 158 def format(tag, time, record) time = Fluent::Engine.now if time.nil? # handle timestamps that are not EventTime types if time.is_a?(Integer) time = Fluent::EventTime.new(time) elsif time.is_a?(Float) components = time.divmod 1 # get integer and decimal components sec = components[0].to_i nsec = (components[1] * 10**9).to_i time = Fluent::EventTime.new(sec, nsec) end if @message_field != "message" if record.key? @message_field if record.key? "message" $log.warn "Overwriting log record field 'message'. You are seeing this warning because in your fluentd config file you have configured the '#{@message_field}' field to be converted to the 'message' field, but the log record already contains a field called 'message' and this is now being overwritten." # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective end record["message"] = record[@message_field] record.delete(@message_field) end end if @message_encoding && record.key?("message") && record["message"] if @replace_invalid_utf8 && (@message_encoding == Encoding::UTF_8) record["message"] = record["message"].encode("UTF-8", invalid: :replace, undef: :replace, replace: "<?>").force_encoding("UTF-8") # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective else record["message"].force_encoding(@message_encoding) end end [tag, time.sec, time.nsec, record].to_msgpack rescue JSON::GeneratorError $log.warn "Unable to format message due to JSON::GeneratorError. Record is:\n\t#{record}" raise end |
#formatted_to_msgpack_binary ⇒ Object
67 68 69 |
# File 'lib/fluent/plugin/out_scalyr.rb', line 67 def formatted_to_msgpack_binary true end |
#handle_response(response) ⇒ Object
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 |
# File 'lib/fluent/plugin/out_scalyr.rb', line 271 def handle_response(response) $log.debug "Response Code: #{response.code}" $log.debug "Response Body: #{response.body}" response_hash = {} begin response_hash = JSON.parse(response.body) rescue StandardError response_hash["status"] = "Invalid JSON response from server" end # make sure the JSON reponse has a "status" field unless response_hash.key? "status" $log.debug "JSON response does not contain status message" raise Scalyr::ServerError.new "JSON response does not contain status message" end status = response_hash["status"] # 4xx codes are handled separately if response.code =~ /^4\d\d/ raise Scalyr::Client4xxError.new status else if status != "success" # rubocop:disable Style/IfInsideElse if status =~ /discardBuffer/ $log.warn "Received 'discardBuffer' message from server. Buffer dropped." elsif status =~ %r{/client/}i raise Scalyr::ClientError.new status else # don't check specifically for server, we assume all non-client errors are server errors raise Scalyr::ServerError.new status end elsif !response.code.include? "200" # response code is a string not an int raise Scalyr::ServerError end end end |
#multi_workers_ready? ⇒ Boolean
71 72 73 |
# File 'lib/fluent/plugin/out_scalyr.rb', line 71 def multi_workers_ready? true end |
#post_request(uri, body) ⇒ Object
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/fluent/plugin/out_scalyr.rb', line 232 def post_request(uri, body) https = Net::HTTP.new(uri.host, uri.port) https.use_ssl = true # verify peers to prevent potential MITM attacks if @ssl_verify_peer https.ca_file = @ssl_ca_bundle_path unless @ssl_ca_bundle_path.nil? https.ssl_version = :TLSv1_2 https.verify_mode = OpenSSL::SSL::VERIFY_PEER https.verify_depth = @ssl_verify_depth end # use compression if enabled encoding = nil if @compression_type if @compression_type == "deflate" encoding = "deflate" body = Zlib::Deflate.deflate(body, @compression_level) elsif @compression_type == "bz2" encoding = "bz2" io = StringIO.new bz2 = RBzip2.default_adapter::Compressor.new io bz2.write body bz2.close body = io.string end end post = Net::HTTP::Post.new uri.path post.add_field("Content-Type", "application/json") post.add_field("Content-Encoding", encoding) if @compression_type post.add_field("User-Agent", "fluent-plugin-scalyr;#{@version}") post.body = body https.request(post) end |
#start ⇒ Object
150 151 152 153 154 155 156 |
# File 'lib/fluent/plugin/out_scalyr.rb', line 150 def start super # Generate a session id. This will be called once for each <match> in fluent.conf that uses scalyr @session = SecureRandom.uuid $log.info "Scalyr Fluentd Plugin ID id=#{plugin_id} worker=#{fluentd_worker_id} session=#{@session} version=#{@version}" # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective end |
#to_millis(timestamp) ⇒ Object
explicit function to convert to milliseconds will make things easier to maintain if/when fluentd supports higher than second resolutions
228 229 230 |
# File 'lib/fluent/plugin/out_scalyr.rb', line 228 def to_millis() (.sec * 10**3) + (.nsec / 10**6) end |
#to_nanos(seconds, nsec) ⇒ Object
explicit function to convert to nanoseconds will make things easier to maintain if/when fluentd supports higher than second resolutions
222 223 224 |
# File 'lib/fluent/plugin/out_scalyr.rb', line 222 def to_nanos(seconds, nsec) (seconds * 10**9) + nsec end |
#write(chunk) ⇒ Object
called by fluentd when a chunk of log messages is ready
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/fluent/plugin/out_scalyr.rb', line 195 def write(chunk) $log.debug "Size of chunk is: #{chunk.size}" requests = build_add_events_body(chunk) $log.debug "Chunk split into #{requests.size} request(s)." requests.each_with_index {|request, index| $log.debug "Request #{index + 1}/#{requests.size}: #{request[:body].bytesize} bytes" begin response = post_request(@add_events_uri, request[:body]) handle_response(response) rescue OpenSSL::SSL::SSLError => e if e..include? "certificate verify failed" $log.warn "SSL certificate verification failed. Please make sure your certificate bundle is configured correctly and points to a valid file. You can configure this with the ssl_ca_bundle_path configuration option. The current value of ssl_ca_bundle_path is '#{@ssl_ca_bundle_path}'" # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective end $log.warn e. $log.warn "Discarding buffer chunk without retrying or logging to <secondary>" rescue Scalyr::Client4xxError => e $log.warn "4XX status code received for request #{index + 1}/#{requests.size}. Discarding buffer without retrying or logging.\n\t#{response.code} - #{e.}\n\tChunk Size: #{chunk.size}\n\tLog messages this request: #{request[:record_count]}\n\tJSON payload size: #{request[:body].bytesize}\n\tSample: #{request[:body][0, 1024]}..." end } rescue JSON::GeneratorError $log.warn "Unable to format message due to JSON::GeneratorError." raise end |