Class: Scalyr::ScalyrOut

Inherits:
Fluent::Plugin::Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_scalyr.rb

Instance Method Summary collapse

Instance Method Details

#build_add_events_body(chunk) ⇒ Object



309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
# File 'lib/fluent/plugin/out_scalyr.rb', line 309

def build_add_events_body(chunk)
  # requests
  requests = []

  # set of unique scalyr threads for this chunk
  current_threads = {}

  # byte count
  total_bytes = 0

  # create a Scalyr event object for each record in the chunk
  events = []
  chunk.msgpack_each {|(tag, sec, nsec, record)| # rubocop:disable Metrics/BlockLength
    timestamp = to_nanos(sec, nsec)

    thread_id = tag

    # then update the map of threads for this chunk
    current_threads[tag] = thread_id

    # add a logfile field if one doesn't exist
    record["logfile"] = "/fluentd/#{tag}" unless record.key? "logfile"

    # set per-event parser if it is configured
    record["parser"] = @parser unless @parser.nil?

    # append to list of events
    event = {thread: thread_id.to_s,
             ts:     timestamp,
             attrs:  record}

    # get json string of event to keep track of how many bytes we are sending

    begin
      event_json = event.to_json
    rescue JSON::GeneratorError, Encoding::UndefinedConversionError => e
      $log.warn "JSON serialization of the event failed: #{e.class}: #{e.message}"

      # Send the faulty event to a label @ERROR block and allow to handle it there (output to exceptions file for ex)
      time = Fluent::EventTime.new(sec, nsec)
      router.emit_error_event(tag, time, record, e)

      # Print attribute values for debugging / troubleshooting purposes
      $log.debug "Event attributes:"

      event[:attrs].each do |key, value|
        # NOTE: value doesn't always value.encoding attribute so we use .class which is always available
        $log.debug "\t#{key} (#{value.class}): '#{value}'"
      end

      # Recursively re-encode and sanitize potentially bad string values
      event[:attrs] = sanitize_and_reencode_value(event[:attrs])
      event_json = event.to_json
    end

    # generate new request if json size of events in the array exceed maximum request buffer size
    append_event = true
    if total_bytes + event_json.bytesize > @max_request_buffer
      # the case where a single event causes us to exceed the @max_request_buffer
      if events.empty?
        # if we are able to truncate the content (and append an ellipsis)
        # inside the @message_field we do so here
        if record.key?(@message_field) &&
          record[@message_field].is_a?(String) &&
          record[@message_field].bytesize > event_json.bytesize - @max_request_buffer &&
          record[@message_field].bytesize >= 3

          @log.warn "Received a record that cannot fit within max_request_buffer "\
            "(#{@max_request_buffer}) from #{record['logfile']}, serialized event size "\
            "is #{event_json.bytesize}. The #{@message_field} field will be truncated to fit."
          max_msg_size = @max_request_buffer - event_json.bytesize - 3
          truncated_msg = event[:attrs][@message_field][0...max_msg_size] + "..."
          event[:attrs][@message_field] = truncated_msg
          events << event

        # otherwise we drop the event and save ourselves hitting a 4XX response from the server
        else
          @log.warn "Received a record that cannot fit within max_request_buffer "\
            "(#{@max_request_buffer}) from #{record['logfile']}, serialized event size "\
            "is #{event_json.bytesize}. The #{@message_field} field too short to truncate, "\
            "dropping event."
        end
        append_event = false
      end

      unless events.empty?
        request = create_request(events, current_threads)
        requests << request
      end

      total_bytes = 0
      current_threads = {}
      events = []
    end

    # if we haven't consumed the current event already
    # add it to the end of our array and keep track of the json bytesize
    if append_event
      events << event
      total_bytes += event_json.bytesize
    end
  }

  # create a final request with any left over events
  unless events.empty?
    request = create_request(events, current_threads)
    requests << request
  end

  requests
end

#compat_parameters_default_chunk_keyObject

support for version 0.14.0:



63
64
65
# File 'lib/fluent/plugin/out_scalyr.rb', line 63

def compat_parameters_default_chunk_key
  ""
end

#configure(conf) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/fluent/plugin/out_scalyr.rb', line 75

def configure(conf)
  @version = if Gem.loaded_specs.key?("fluent-plugin-scalyr")
               Gem.loaded_specs["fluent-plugin-scalyr"].version
             else
               "unknown"
             end

  if conf.elements("buffer").empty?
    $log.warn "Pre 0.14.0 configuration file detected.  Please consider updating your configuration file" # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective
  end

  compat_parameters_buffer(conf, default_chunk_key: "")

  super

  if @buffer.chunk_limit_size > 6_000_000
    $log.warn "Buffer chunk size is greater than 6Mb.  This may result in requests being rejected by Scalyr" # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective
  end

  if @max_request_buffer > 6_000_000
    $log.warn "Maximum request buffer > 6Mb.  This may result in requests being rejected by Scalyr" # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective
  end

  @message_encoding = nil
  if @force_message_encoding.to_s != ""
    begin
      @message_encoding = Encoding.find(@force_message_encoding)
      $log.debug "Forcing message encoding to '#{@force_message_encoding}'"
    rescue ArgumentError
      $log.warn "No encoding '#{@force_message_encoding}' found.  Ignoring"
    end
  end

  # evaluate any statements in string value of the server_attributes object
  if @server_attributes
    new_attributes = {}
    @server_attributes.each do |key, value|
      next unless value.is_a?(String)

      m = /^\#{(.*)}$/.match(value)
      new_attributes[key] = if m
                              eval(m[1]) # rubocop:disable Security/Eval
                            else
                              value
                            end
    end
    @server_attributes = new_attributes
  end

  # See if we should use the hostname as the server_attributes.serverHost
  if @use_hostname_for_serverhost

    # ensure server_attributes is not nil
    @server_attributes = {} if @server_attributes.nil?

    # only set serverHost if it doesn't currently exist in server_attributes
    # Note: Use strings rather than symbols for the key, because keys coming
    # from the config file will be strings
    unless @server_attributes.key? "serverHost"
      @server_attributes["serverHost"] = Socket.gethostname
    end
  end

  @scalyr_server << "/" unless @scalyr_server.end_with?("/")

  @add_events_uri = URI @scalyr_server + "addEvents"

  num_threads = @buffer_config.flush_thread_count

  # forcibly limit the number of threads to 1 for now, to ensure requests always have incrementing timestamps
  if num_threads > 1
    raise Fluent::ConfigError, "num_threads is currently limited to 1. You specified #{num_threads}."
  end
end

#create_request(events, current_threads) ⇒ Object



421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
# File 'lib/fluent/plugin/out_scalyr.rb', line 421

def create_request(events, current_threads)
  # build the scalyr thread objects
  threads = []
  current_threads.each do |tag, id|
    threads << {id:   id.to_s,
                name: "Fluentd: #{tag}"}
  end

  current_time = to_millis(Fluent::Engine.now)

  body = {token:            @api_write_token,
          client_timestamp: current_time.to_s,
          session:          @session,
          events:           events,
          threads:          threads}

  # add server_attributes hash if it exists
  body[:sessionInfo] = @server_attributes if @server_attributes

  {body: body.to_json, record_count: events.size}
end

#format(tag, time, record) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/fluent/plugin/out_scalyr.rb', line 158

def format(tag, time, record)
  time = Fluent::Engine.now if time.nil?

  # handle timestamps that are not EventTime types
  if time.is_a?(Integer)
    time = Fluent::EventTime.new(time)
  elsif time.is_a?(Float)
    components = time.divmod 1 # get integer and decimal components
    sec = components[0].to_i
    nsec = (components[1] * 10**9).to_i
    time = Fluent::EventTime.new(sec, nsec)
  end

  if @message_field != "message"
    if record.key? @message_field
      if record.key? "message"
        $log.warn "Overwriting log record field 'message'.  You are seeing this warning because in your fluentd config file you have configured the '#{@message_field}' field to be converted to the 'message' field, but the log record already contains a field called 'message' and this is now being overwritten." # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective
      end
      record["message"] = record[@message_field]
      record.delete(@message_field)
    end
  end

  if @message_encoding && record.key?("message") && record["message"]
    if @replace_invalid_utf8 && (@message_encoding == Encoding::UTF_8)
      record["message"] = record["message"].encode("UTF-8", invalid: :replace, undef: :replace, replace: "<?>").force_encoding("UTF-8") # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective
    else
      record["message"].force_encoding(@message_encoding)
    end
  end
  [tag, time.sec, time.nsec, record].to_msgpack
rescue JSON::GeneratorError
  $log.warn "Unable to format message due to JSON::GeneratorError.  Record is:\n\t#{record}"
  raise
end

#formatted_to_msgpack_binaryObject



67
68
69
# File 'lib/fluent/plugin/out_scalyr.rb', line 67

def formatted_to_msgpack_binary
  true
end

#handle_response(response) ⇒ Object



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/fluent/plugin/out_scalyr.rb', line 271

def handle_response(response)
  $log.debug "Response Code: #{response.code}"
  $log.debug "Response Body: #{response.body}"

  response_hash = {}

  begin
    response_hash = JSON.parse(response.body)
  rescue StandardError
    response_hash["status"] = "Invalid JSON response from server"
  end

  # make sure the JSON reponse has a "status" field
  unless response_hash.key? "status"
    $log.debug "JSON response does not contain status message"
    raise Scalyr::ServerError.new "JSON response does not contain status message"
  end

  status = response_hash["status"]

  # 4xx codes are handled separately
  if response.code =~ /^4\d\d/
    raise Scalyr::Client4xxError.new status
  else
    if status != "success" # rubocop:disable Style/IfInsideElse
      if status =~ /discardBuffer/
        $log.warn "Received 'discardBuffer' message from server.  Buffer dropped."
      elsif status =~ %r{/client/}i
        raise Scalyr::ClientError.new status
      else # don't check specifically for server, we assume all non-client errors are server errors
        raise Scalyr::ServerError.new status
      end
    elsif !response.code.include? "200" # response code is a string not an int
      raise Scalyr::ServerError
    end
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


71
72
73
# File 'lib/fluent/plugin/out_scalyr.rb', line 71

def multi_workers_ready?
  true
end

#post_request(uri, body) ⇒ Object



232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/fluent/plugin/out_scalyr.rb', line 232

def post_request(uri, body)
  https = Net::HTTP.new(uri.host, uri.port)
  https.use_ssl = true

  # verify peers to prevent potential MITM attacks
  if @ssl_verify_peer
    https.ca_file = @ssl_ca_bundle_path unless @ssl_ca_bundle_path.nil?
    https.ssl_version = :TLSv1_2
    https.verify_mode = OpenSSL::SSL::VERIFY_PEER
    https.verify_depth = @ssl_verify_depth
  end

  # use compression if enabled
  encoding = nil

  if @compression_type
    if @compression_type == "deflate"
      encoding = "deflate"
      body = Zlib::Deflate.deflate(body, @compression_level)
    elsif @compression_type == "bz2"
      encoding = "bz2"
      io = StringIO.new
      bz2 = RBzip2.default_adapter::Compressor.new io
      bz2.write body
      bz2.close
      body = io.string
    end
  end

  post = Net::HTTP::Post.new uri.path
  post.add_field("Content-Type", "application/json")
  post.add_field("Content-Encoding", encoding) if @compression_type
  post.add_field("User-Agent", "fluent-plugin-scalyr;#{@version}")

  post.body = body

  https.request(post)
end

#startObject



150
151
152
153
154
155
156
# File 'lib/fluent/plugin/out_scalyr.rb', line 150

def start
  super
  # Generate a session id.  This will be called once for each <match> in fluent.conf that uses scalyr
  @session = SecureRandom.uuid

  $log.info "Scalyr Fluentd Plugin ID id=#{plugin_id} worker=#{fluentd_worker_id} session=#{@session} version=#{@version}" # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective
end

#to_millis(timestamp) ⇒ Object

explicit function to convert to milliseconds will make things easier to maintain if/when fluentd supports higher than second resolutions



228
229
230
# File 'lib/fluent/plugin/out_scalyr.rb', line 228

def to_millis(timestamp)
  (timestamp.sec * 10**3) + (timestamp.nsec / 10**6)
end

#to_nanos(seconds, nsec) ⇒ Object

explicit function to convert to nanoseconds will make things easier to maintain if/when fluentd supports higher than second resolutions



222
223
224
# File 'lib/fluent/plugin/out_scalyr.rb', line 222

def to_nanos(seconds, nsec)
  (seconds * 10**9) + nsec
end

#write(chunk) ⇒ Object

called by fluentd when a chunk of log messages is ready



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/fluent/plugin/out_scalyr.rb', line 195

def write(chunk)
  $log.debug "Size of chunk is: #{chunk.size}"
  requests = build_add_events_body(chunk)
  $log.debug "Chunk split into #{requests.size} request(s)."

  requests.each_with_index {|request, index|
    $log.debug "Request #{index + 1}/#{requests.size}: #{request[:body].bytesize} bytes"
    begin
      response = post_request(@add_events_uri, request[:body])
      handle_response(response)
    rescue OpenSSL::SSL::SSLError => e
      if e.message.include? "certificate verify failed"
        $log.warn "SSL certificate verification failed.  Please make sure your certificate bundle is configured correctly and points to a valid file. You can configure this with the ssl_ca_bundle_path configuration option. The current value of ssl_ca_bundle_path is '#{@ssl_ca_bundle_path}'" # rubocop:disable Layout/LineLength, Lint/RedundantCopDisableDirective
      end
      $log.warn e.message
      $log.warn "Discarding buffer chunk without retrying or logging to <secondary>"
    rescue Scalyr::Client4xxError => e
      $log.warn "4XX status code received for request #{index + 1}/#{requests.size}.  Discarding buffer without retrying or logging.\n\t#{response.code} - #{e.message}\n\tChunk Size: #{chunk.size}\n\tLog messages this request: #{request[:record_count]}\n\tJSON payload size: #{request[:body].bytesize}\n\tSample: #{request[:body][0, 1024]}..."
    end
  }
rescue JSON::GeneratorError
  $log.warn "Unable to format message due to JSON::GeneratorError."
  raise
end