Class: Fluent::GoogleCloudOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::GoogleCloudOutput
- Includes:
- selfself::ConfigConstants, selfself::InternalConstants, selfself::ServiceConstants
- Defined in:
- lib/fluent/plugin/out_google_cloud.rb
Overview
fluentd output plugin for the Stackdriver Logging API
Defined Under Namespace
Modules: ConfigConstants, CredentialsInfo, InternalConstants, Platform, ServiceConstants
Constant Summary collapse
- PLUGIN_NAME =
'Fluentd Google Cloud Logging plugin'
- PLUGIN_VERSION =
'0.6.8'
- LOGGING_SCOPE =
Name of the the Google cloud logging write scope.
'https://www.googleapis.com/auth/logging.write'
- METADATA_SERVICE_ADDR =
Address of the metadata service.
'169.254.169.254'
Instance Attribute Summary collapse
-
#common_labels ⇒ Object
readonly
Returns the value of attribute common_labels.
-
#project_id ⇒ Object
readonly
Expose attr_readers to make testing of metadata more direct than only testing it indirectly through metadata sent with logs.
-
#resource ⇒ Object
readonly
Returns the value of attribute resource.
-
#vm_id ⇒ Object
readonly
Returns the value of attribute vm_id.
-
#zone ⇒ Object
readonly
Returns the value of attribute zone.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ GoogleCloudOutput
constructor
A new instance of GoogleCloudOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ GoogleCloudOutput
Returns a new instance of GoogleCloudOutput.
332 333 334 335 336 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 332 def initialize super # use the global logger @log = $log # rubocop:disable Style/GlobalVars end |
Instance Attribute Details
#common_labels ⇒ Object (readonly)
Returns the value of attribute common_labels.
330 331 332 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 330 def common_labels @common_labels end |
#project_id ⇒ Object (readonly)
Expose attr_readers to make testing of metadata more direct than only testing it indirectly through metadata sent with logs.
326 327 328 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 326 def project_id @project_id end |
#resource ⇒ Object (readonly)
Returns the value of attribute resource.
329 330 331 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 329 def resource @resource end |
#vm_id ⇒ Object (readonly)
Returns the value of attribute vm_id.
328 329 330 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 328 def vm_id @vm_id end |
#zone ⇒ Object (readonly)
Returns the value of attribute zone.
327 328 329 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 327 def zone @zone end |
Instance Method Details
#configure(conf) ⇒ Object
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 338 def configure(conf) super # If monitoring is enabled, register metrics in the default registry # and store metric objects for future use. if @enable_monitoring registry = Monitoring::MonitoringRegistryFactory.create @monitoring_type @successful_requests_count = registry.counter( :stackdriver_successful_requests_count, 'A number of successful requests to the Stackdriver Logging API') @failed_requests_count = registry.counter( :stackdriver_failed_requests_count, 'A number of failed requests to the Stackdriver Logging API,'\ ' broken down by the error code') @ingested_entries_count = registry.counter( :stackdriver_ingested_entries_count, 'A number of log entries ingested by Stackdriver Logging') @dropped_entries_count = registry.counter( :stackdriver_dropped_entries_count, 'A number of log entries dropped by the Stackdriver output plugin') @retried_entries_count = registry.counter( :stackdriver_retried_entries_count, 'The number of log entries that failed to be ingested by the'\ ' Stackdriver output plugin due to a transient error and were'\ ' retried') end # Alert on old authentication configuration. unless @auth_method.nil? && @private_key_email.nil? && @private_key_path.nil? && @private_key_passphrase.nil? extra = [] extra << 'auth_method' unless @auth_method.nil? extra << 'private_key_email' unless @private_key_email.nil? extra << 'private_key_path' unless @private_key_path.nil? extra << 'private_key_passphrase' unless @private_key_passphrase.nil? fail Fluent::ConfigError, "#{PLUGIN_NAME} no longer supports auth_method.\n" \ 'Please remove configuration parameters: ' + extra.join(' ') end set_regexp_patterns @platform = detect_platform # Set required variables: @project_id, @vm_id, @vm_name and @zone. # Retrieve monitored resource. # Fail over to retrieve monitored resource via the legacy path if we fail # to get it from Metadata Agent. @resource ||= determine_agent_level_monitored_resource_via_legacy # Set regexp that we should match tags against later on. Using a list # instead of a map to ensure order. For example, tags will be matched # against Cloud Functions first, then GKE. @tag_regexp_list = [] if @resource.type == GKE_CONSTANTS[:resource_type] # We only support Cloud Functions logs for GKE right now. if ('instance/attributes/' ).split.include?('gcf_region') # Fetch this info and store it to avoid recurring # metadata server calls. @gcf_region = ('instance/attributes/gcf_region') @tag_regexp_list << [ CLOUDFUNCTIONS_CONSTANTS[:resource_type], @compiled_cloudfunctions_tag_regexp ] end @tag_regexp_list << [ GKE_CONSTANTS[:resource_type], @compiled_kubernetes_tag_regexp ] end # Determine the common labels that should be added to all log entries # processed by this logging agent. @common_labels = determine_agent_level_common_labels # The resource and labels are now set up; ensure they can't be modified # without first duping them. @resource.freeze @resource.labels.freeze @common_labels.freeze # Log an informational message containing the Logs viewer URL @log.info 'Logs viewer address: https://console.cloud.google.com/logs/', "viewer?project=#{@project_id}&resource=#{@resource.type}/", "instance_id/#{@vm_id}" end |
#shutdown ⇒ Object
436 437 438 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 436 def shutdown super end |
#start ⇒ Object
429 430 431 432 433 434 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 429 def start super init_api_client @successful_call = false @timenanos_warning = false end |
#write(chunk) ⇒ Object
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 440 def write(chunk) grouped_entries = group_log_entries_by_tag_and_local_resource_id(chunk) grouped_entries.each do |(tag, local_resource_id), arr| entries = [] group_level_resource, group_level_common_labels = determine_group_level_monitored_resource_and_labels( tag, local_resource_id) arr.each do |time, record| entry_level_resource, entry_level_common_labels = determine_entry_level_monitored_resource_and_labels( group_level_resource, group_level_common_labels, record) is_json = false if @detect_json # Save the timestamp and severity if available, then clear it out to # allow for determining whether we should parse the log or message # field. = record.delete('time') severity = record.delete('severity') # If the log is json, we want to export it as a structured log # unless there is additional metadata that would be lost. record_json = nil if record.length == 1 %w(log message msg).each do |field| if record.key?(field) record_json = parse_json_or_nil(record[field]) end end end unless record_json.nil? record = record_json is_json = true end # Restore timestamp and severity if necessary. Note that we don't # want to override these keys in the JSON we've just parsed. record['time'] ||= if record['severity'] ||= severity if severity end ts_secs, ts_nanos = ( entry_level_resource.type, record, time) severity = compute_severity( entry_level_resource.type, record, entry_level_common_labels) ts_secs = begin Integer ts_secs rescue ArgumentError, TypeError ts_secs end ts_nanos = begin Integer ts_nanos rescue ArgumentError, TypeError ts_nanos end if @use_grpc entry = Google::Logging::V2::LogEntry.new( labels: entry_level_common_labels, resource: Google::Api::MonitoredResource.new( type: entry_level_resource.type, labels: entry_level_resource.labels.to_h ), severity: grpc_severity(severity) ) # If "seconds" is null or not an integer, we will omit the timestamp # field and defer the decision on how to handle it to the downstream # Logging API. If "nanos" is null or not an integer, it will be set # to 0. if ts_secs.is_a?(Integer) ts_nanos = 0 unless ts_nanos.is_a?(Integer) entry. = Google::Protobuf::Timestamp.new( seconds: ts_secs, nanos: ts_nanos ) end else # Remove the labels if we didn't populate them with anything. entry_level_resource.labels = nil if entry_level_resource.labels.empty? entry = Google::Apis::LoggingV2beta1::LogEntry.new( labels: entry_level_common_labels, resource: entry_level_resource, severity: severity, timestamp: { seconds: ts_secs, nanos: ts_nanos } ) end # Get fully-qualified trace id for LogEntry "trace" field. fq_trace_id = record.delete(@trace_key) entry.trace = fq_trace_id if fq_trace_id set_log_entry_fields(record, entry) set_payload(entry_level_resource.type, record, entry, is_json) entries.push(entry) end # Don't send an empty request if we rejected all the entries. next if entries.empty? log_name = "projects/#{@project_id}/logs/#{log_name( tag, group_level_resource)}" # Does the actual write to the cloud logging api. client = api_client if @use_grpc begin labels_utf8_pairs = group_level_common_labels.map do |k, v| [k.encode('utf-8'), convert_to_utf8(v)] end write_request = Google::Logging::V2::WriteLogEntriesRequest.new( log_name: log_name, resource: Google::Api::MonitoredResource.new( type: group_level_resource.type, labels: group_level_resource.labels.to_h ), labels: labels_utf8_pairs.to_h, entries: entries ) entries_count = entries.length client.write_log_entries(write_request) increment_successful_requests_count increment_ingested_entries_count(entries_count) # Let the user explicitly know when the first call succeeded, to aid # with verification and troubleshooting. unless @successful_call @successful_call = true @log.info 'Successfully sent gRPC to Stackdriver Logging API.' end rescue GRPC::Cancelled => error increment_failed_requests_count(GRPC::Core::StatusCodes::CANCELLED) increment_retried_entries_count(entries_count, error.code) # RPC cancelled, so retry via re-raising the error. @log.debug "Retrying #{entries_count} log message(s) later.", error: error.to_s, error_code: error.code.to_s raise error rescue GRPC::BadStatus => error increment_failed_requests_count(error.code) case error.code when GRPC::Core::StatusCodes::CANCELLED, GRPC::Core::StatusCodes::UNAVAILABLE, GRPC::Core::StatusCodes::DEADLINE_EXCEEDED, GRPC::Core::StatusCodes::INTERNAL, GRPC::Core::StatusCodes::UNKNOWN # Server error, so retry via re-raising the error. increment_retried_entries_count(entries.length, error.code) @log.debug "Retrying #{entries_count} log message(s) later.", error: error.to_s, error_code: error.code.to_s raise error when GRPC::Core::StatusCodes::UNIMPLEMENTED, GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED # Most client errors indicate a problem with the request itself # and should not be retried. increment_dropped_entries_count(entries_count) @log.warn "Dropping #{entries_count} log message(s)", error: error.to_s, error_code: error.code.to_s when GRPC::Core::StatusCodes::UNAUTHENTICATED # Authorization error. # These are usually solved via a `gcloud auth` call, or by # modifying the permissions on the Google Cloud project. increment_dropped_entries_count(entries_count) @log.warn "Dropping #{entries_count} log message(s)", error: error.to_s, error_code: error.code.to_s else # Assume this is a problem with the request itself and don't # retry. increment_dropped_entries_count(entries_count) @log.error "Unknown response code #{error.code} from the "\ "server, dropping #{entries_count} log message(s)", error: error.to_s, error_code: error.code.to_s end end else begin write_request = \ Google::Apis::LoggingV2beta1::WriteLogEntriesRequest.new( log_name: log_name, resource: group_level_resource, labels: group_level_common_labels, entries: entries) entries_count = entries.length # TODO: RequestOptions begin client.write_entry_log_entries(write_request) rescue Google::Apis::Error => error increment_failed_requests_count(error.status_code) raise error end increment_successful_requests_count increment_ingested_entries_count(entries_count) # Let the user explicitly know when the first call succeeded, to aid # with verification and troubleshooting. unless @successful_call @successful_call = true @log.info 'Successfully sent to Stackdriver Logging API.' end rescue Google::Apis::ServerError => error # Server error, so retry via re-raising the error. increment_retried_entries_count(entries.length, error.status_code) @log.debug "Retrying #{entries_count} log message(s) later.", error: error.to_s, error_code: error.status_code.to_s raise error rescue Google::Apis::AuthorizationError => error # Authorization error. # These are usually solved via a `gcloud auth` call, or by modifying # the permissions on the Google Cloud project. increment_dropped_entries_count(entries_count) @log.warn "Dropping #{entries_count} log message(s)", error_class: error.class.to_s, error: error.to_s rescue Google::Apis::ClientError => error # Most ClientErrors indicate a problem with the request itself and # should not be retried. increment_dropped_entries_count(entries_count) @log.warn "Dropping #{entries_count} log message(s)", error_class: error.class.to_s, error: error.to_s end end end end |