Class: Fluent::GoogleCloudOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::GoogleCloudOutput
- Includes:
- Common::ServiceConstants, selfself::ConfigConstants, selfself::InternalConstants
- Defined in:
- lib/fluent/plugin/out_google_cloud.rb
Overview
fluentd output plugin for the Stackdriver Logging API
Defined Under Namespace
Modules: ConfigConstants, InternalConstants
Constant Summary collapse
- PLUGIN_NAME =
'Fluentd Google Cloud Logging plugin'.freeze
- PLUGIN_VERSION =
Follows semver.org format.
begin # Extract plugin version from file path. match_data = __FILE__.match( %r{fluent-plugin-google-cloud-(?<version>[^/]*)/}) if match_data match_data['version'] else # Extract plugin version by finding the spec this file was loaded from. dependency = Gem::Dependency.new('fluent-plugin-google-cloud') all_specs, = Gem::SpecFetcher.fetcher.spec_for_dependency(dependency) matching_version, = all_specs.grep( proc { |spec,| __FILE__.include?(spec.full_gem_path) }) do |spec,| spec.version.to_s end # If no matching version was found, return a valid but obviously wrong # value. matching_version || '0.0.0-unknown' end end.freeze
Constants included from Common::ServiceConstants
Common::ServiceConstants::APPENGINE_CONSTANTS, Common::ServiceConstants::COMPUTE_CONSTANTS, Common::ServiceConstants::DATAFLOW_CONSTANTS, Common::ServiceConstants::DATAPROC_CONSTANTS, Common::ServiceConstants::EC2_CONSTANTS, Common::ServiceConstants::GKE_CONSTANTS, Common::ServiceConstants::K8S_CONTAINER_CONSTANTS, Common::ServiceConstants::K8S_NODE_CONSTANTS, Common::ServiceConstants::K8S_POD_CONSTANTS, Common::ServiceConstants::ML_CONSTANTS, Common::ServiceConstants::SUBSERVICE_MAP, Common::ServiceConstants::SUBSERVICE_METADATA_ATTRIBUTES
Instance Attribute Summary collapse
-
#common_labels ⇒ Object
readonly
Returns the value of attribute common_labels.
-
#monitoring_resource ⇒ Object
readonly
Returns the value of attribute monitoring_resource.
-
#project_id ⇒ Object
readonly
Expose attr_readers to make testing of metadata more direct than only testing it indirectly through metadata sent with logs.
-
#resource ⇒ Object
readonly
Returns the value of attribute resource.
-
#vm_id ⇒ Object
readonly
Returns the value of attribute vm_id.
-
#zone ⇒ Object
readonly
Returns the value of attribute zone.
Class Method Summary collapse
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ GoogleCloudOutput
constructor
A new instance of GoogleCloudOutput.
- #multi_workers_ready? ⇒ Boolean
- #shutdown ⇒ Object
- #start ⇒ Object
- #update_uptime ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ GoogleCloudOutput
Returns a new instance of GoogleCloudOutput.
438 439 440 441 442 443 444 445 446 447 448 449 450 451 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 438 def initialize super # use the global logger @log = $log # rubocop:disable Style/GlobalVars @failed_requests_count = nil @successful_requests_count = nil @dropped_entries_count = nil @ingested_entries_count = nil @retried_entries_count = nil @ok_code = nil @uptime_update_time = Time.now.to_i end |
Instance Attribute Details
#common_labels ⇒ Object (readonly)
Returns the value of attribute common_labels.
435 436 437 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 435 def common_labels @common_labels end |
#monitoring_resource ⇒ Object (readonly)
Returns the value of attribute monitoring_resource.
436 437 438 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 436 def monitoring_resource @monitoring_resource end |
#project_id ⇒ Object (readonly)
Expose attr_readers to make testing of metadata more direct than only testing it indirectly through metadata sent with logs.
431 432 433 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 431 def project_id @project_id end |
#resource ⇒ Object (readonly)
Returns the value of attribute resource.
434 435 436 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 434 def resource @resource end |
#vm_id ⇒ Object (readonly)
Returns the value of attribute vm_id.
433 434 435 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 433 def vm_id @vm_id end |
#zone ⇒ Object (readonly)
Returns the value of attribute zone.
432 433 434 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 432 def zone @zone end |
Class Method Details
.version_string ⇒ Object
798 799 800 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 798 def self.version_string @version_string ||= "google-fluentd/#{PLUGIN_VERSION}" end |
Instance Method Details
#configure(conf) ⇒ Object
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 453 def configure(conf) super # TODO(qingling128): Remove this warning after the support is added. Also # remove the comment in the description of this configuration. unless @logging_api_url == DEFAULT_LOGGING_API_URL || @use_grpc @log.warn 'Detected customized logging_api_url while use_grpc is not' \ ' enabled. Customized logging_api_url for the non-gRPC path' \ ' is not supported. The logging_api_url option will be' \ ' ignored.' end # Alert on old authentication configuration. unless @auth_method.nil? && @private_key_email.nil? && @private_key_path.nil? && @private_key_passphrase.nil? extra = [] extra << 'auth_method' unless @auth_method.nil? extra << 'private_key_email' unless @private_key_email.nil? extra << 'private_key_path' unless @private_key_path.nil? extra << 'private_key_passphrase' unless @private_key_passphrase.nil? raise Fluent::ConfigError, "#{PLUGIN_NAME} no longer supports auth_method.\n" \ "Please remove configuration parameters: #{extra.join(' ')}" end set_regexp_patterns @utils = Common::Utils.new(@log) @platform = @utils.detect_platform(@use_metadata_service) # Treat an empty setting of the credentials file path environment variable # as unset. This way the googleauth lib could fetch the credentials # following the fallback path. ENV.delete(CREDENTIALS_PATH_ENV_VAR) if ENV[CREDENTIALS_PATH_ENV_VAR] == '' # Set required variables: @project_id, @vm_id, @vm_name and @zone. @project_id = @utils.get_project_id(@platform, @project_id) @vm_id = @utils.get_vm_id(@platform, @vm_id) @vm_name = @utils.get_vm_name(@vm_name) @zone = @utils.get_location(@platform, @zone, @use_aws_availability_zone) # All metadata parameters must now be set. @utils.( @platform, @project_id, @zone, @vm_id) # Retrieve monitored resource. # Fail over to retrieve monitored resource via the legacy path if we fail # to get it from Metadata Agent. @resource ||= @utils.determine_agent_level_monitored_resource_via_legacy( @platform, @subservice_name, @detect_subservice, @vm_id, @zone) if @metrics_resource unless @metrics_resource[:type].is_a?(String) raise Fluent::ConfigError, 'metrics_resource.type must be a string:' \ " #{@metrics_resource}." end if @metrics_resource.key?(:labels) unless @metrics_resource[:labels].is_a?(Hash) raise Fluent::ConfigError, 'metrics_resource.labels must be a hash:' \ " #{@metrics_resource}." end extra_keys = @metrics_resource.reject do |k, _| k == :type || k == :labels end unless extra_keys.empty? raise Fluent::ConfigError, "metrics_resource has unrecognized keys: #{extra_keys.keys}." end else extra_keys = @metrics_resource.reject do |k, _| k == :type || k.to_s.start_with?('labels.') end unless extra_keys.empty? raise Fluent::ConfigError, "metrics_resource has unrecognized keys: #{extra_keys.keys}." end # Transform the Hash form of the metrics_resource config if necessary. resource_type = @metrics_resource[:type] resource_labels = @metrics_resource.each_with_object({}) \ do |(k, v), h| h[k.to_s.sub('labels.', '')] = v if k.to_s.start_with? 'labels.' end @metrics_resource = { type: resource_type, labels: resource_labels } end end # If monitoring is enabled, register metrics in the default registry # and store metric objects for future use. if @enable_monitoring unless Monitoring::MonitoringRegistryFactory.supports_monitoring_type( @monitoring_type) @log.warn "monitoring_type '#{@monitoring_type}' is unknown; "\ 'there will be no metrics' end if @metrics_resource @monitoring_resource = @utils.create_monitored_resource( @metrics_resource[:type], @metrics_resource[:labels]) else @monitoring_resource = @resource end @registry = Monitoring::MonitoringRegistryFactory .create(@monitoring_type, @project_id, @monitoring_resource, @gcm_service_address) # Export metrics every 60 seconds. timer_execute(:export_metrics, 60) { @registry.export } # Uptime should be a gauge, but the metric definition is a counter and # we can't change it. @uptime_metric = @registry.counter( :uptime, [:version], 'Uptime of Logging agent', 'agent.googleapis.com/agent', 'CUMULATIVE') update_uptime timer_execute(:update_uptime, 1) { update_uptime } @successful_requests_count = @registry.counter( :stackdriver_successful_requests_count, [:grpc, :code], 'A number of successful requests to the Stackdriver Logging API', 'agent.googleapis.com/agent', 'CUMULATIVE') @failed_requests_count = @registry.counter( :stackdriver_failed_requests_count, [:grpc, :code], 'A number of failed requests to the Stackdriver Logging '\ 'API, broken down by the error code', 'agent.googleapis.com/agent', 'CUMULATIVE') @ingested_entries_count = @registry.counter( :stackdriver_ingested_entries_count, [:grpc, :code], 'A number of log entries ingested by Stackdriver Logging', 'agent.googleapis.com/agent', 'CUMULATIVE') @dropped_entries_count = @registry.counter( :stackdriver_dropped_entries_count, [:grpc, :code], 'A number of log entries dropped by the Stackdriver output plugin', 'agent.googleapis.com/agent', 'CUMULATIVE') @retried_entries_count = @registry.counter( :stackdriver_retried_entries_count, [:grpc, :code], 'The number of log entries that failed to be ingested by '\ 'the Stackdriver output plugin due to a transient error '\ 'and were retried', 'agent.googleapis.com/agent', 'CUMULATIVE') @ok_code = @use_grpc ? GRPC::Core::StatusCodes::OK : 200 end # Set regexp that we should match tags against later on. Using a list # instead of a map to ensure order. @tag_regexp_list = [] if @resource.type == GKE_CONSTANTS[:resource_type] @tag_regexp_list << [ GKE_CONSTANTS[:resource_type], @compiled_kubernetes_tag_regexp ] end # Determine the common labels that should be added to all log entries # processed by this logging agent. @common_labels = determine_agent_level_common_labels(@resource) # The resource and labels are now set up; ensure they can't be modified # without first duping them. @resource.freeze @resource.labels.freeze @common_labels.freeze if @use_grpc @construct_log_entry = method(:construct_log_entry_in_grpc_format) @write_request = method(:write_request_via_grpc) else @construct_log_entry = method(:construct_log_entry_in_rest_format) @write_request = method(:write_request_via_rest) end if [Common::Platform::GCE, Common::Platform::EC2].include?(@platform) # Log an informational message containing the Logs viewer URL @log.info 'Logs viewer address: https://console.cloud.google.com/logs/', "viewer?project=#{@project_id}&resource=#{@resource.type}/", "instance_id/#{@vm_id}" end end |
#multi_workers_ready? ⇒ Boolean
794 795 796 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 794 def multi_workers_ready? true end |
#shutdown ⇒ Object
656 657 658 659 660 661 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 656 def shutdown super # Export metrics on shutdown. This is a best-effort attempt, and it might # fail, for instance if there was a recent write to the same time series. @registry.export unless @registry.nil? end |
#start ⇒ Object
636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 636 def start super init_api_client @successful_call = false @timenanos_warning = false if @statusz_port > 0 @log.info "Starting statusz server on port #{@statusz_port}" server_create(:out_google_cloud_statusz, @statusz_port, bind: '127.0.0.1') do |data, conn| if data.split(' ')[1] == '/statusz' write_html_response(data, conn, 200, Statusz.response(self)) else write_html_response(data, conn, 404, "Not found\n") end end end end |
#update_uptime ⇒ Object
802 803 804 805 806 807 808 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 802 def update_uptime now = Time.now.to_i @uptime_metric.increment( by: now - @uptime_update_time, labels: { version: Fluent::GoogleCloudOutput.version_string }) @uptime_update_time = now end |
#write(chunk) ⇒ Object
663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 |
# File 'lib/fluent/plugin/out_google_cloud.rb', line 663 def write(chunk) grouped_entries = group_log_entries_by_tag_and_local_resource_id(chunk) requests_to_send = [] grouped_entries.each do |(tag, local_resource_id), arr| entries = [] group_level_resource, group_level_common_labels = determine_group_level_monitored_resource_and_labels( tag, local_resource_id) arr.each do |time, record| entry_level_resource, entry_level_common_labels = determine_entry_level_monitored_resource_and_labels( group_level_resource, group_level_common_labels, record) is_json = false if @detect_json # Save the following fields if available, then clear them out to # allow for determining whether we should parse the log or message # field. # This list should be in sync with # https://cloud.google.com/logging/docs/agent/configuration#special-fields. preserved_keys = [ 'time', 'timeNanos', 'timestamp', 'timestampNanos', 'timestampSeconds', 'severity', @http_request_key, @insert_id_key, @labels_key, @operation_key, @source_location_key, @span_id_key, @trace_key, @trace_sampled_key ] # If the log is json, we want to export it as a structured log # unless there is additional metadata that would be lost. record_json = nil if (record.keys - preserved_keys).length == 1 %w(log message msg).each do |field| if record.key?(field) record_json = parse_json_or_nil(record[field]) end end end unless record_json.nil? # Propagate these if necessary. Note that we don't want to # override these keys in the JSON we've just parsed. preserved_keys.each do |key| record_json[key] ||= record[key] if record.key?(key) && !record_json.key?(key) end record = record_json is_json = true end end ts_secs, ts_nanos, = (record, time) ts_secs, ts_nanos = (, Time.now) \ if @adjust_invalid_timestamps && severity = compute_severity( entry_level_resource.type, record, entry_level_common_labels) dynamic_labels_from_payload = parse_labels(record) entry_level_common_labels = entry_level_common_labels.merge!( dynamic_labels_from_payload) if dynamic_labels_from_payload entry = @construct_log_entry.call(entry_level_common_labels, entry_level_resource, severity, ts_secs, ts_nanos) insert_id = record.delete(@insert_id_key) entry.insert_id = insert_id if insert_id span_id = record.delete(@span_id_key) entry.span_id = span_id if span_id trace = record.delete(@trace_key) entry.trace = compute_trace(trace) if trace trace_sampled = record.delete(@trace_sampled_key) entry.trace_sampled = parse_bool(trace_sampled) unless trace_sampled.nil? set_log_entry_fields(record, entry) set_payload(entry_level_resource.type, record, entry, is_json) entries.push(entry) end # Don't send an empty request if we rejected all the entries. next if entries.empty? log_name = "projects/#{@project_id}/logs/#{log_name( tag, group_level_resource)}" requests_to_send << { entries: entries, log_name: log_name, resource: group_level_resource, labels: group_level_common_labels } end if @split_logs_by_tag requests_to_send.each do |request| @write_request.call(request) end else # Combine all requests into one. The request level "log_name" will be # ported to the entry level. The request level "resource" and "labels" # are ignored as they should have been folded into the entry level # "resource" and "labels" already anyway. combined_entries = [] requests_to_send.each do |request| request[:entries].each do |entry| # Modify entries in-place as they are not needed later on. entry.log_name = request[:log_name] end combined_entries.concat(request[:entries]) end @write_request.call(entries: combined_entries) unless combined_entries.empty? end end |