Class: Fluent::Plugin::OCILoggingOutput

Inherits:
Output
  • Object
show all
Includes:
PublicLoggingSetup, PublicLoggingUtils
Defined in:
lib/fluent/plugin/out_oci_logging.rb

Overview

OCI Logging Fluentd Output plugin

Constant Summary collapse

PAYLOAD_SIZE =

restricting payload size at 9MB

9*1024*1024

Constants included from PublicLoggingSetup

PublicLoggingSetup::AUTO_SIGNER_TYPE, PublicLoggingSetup::LINUX_OCI_CONFIG_DIR, PublicLoggingSetup::PUBLIC_DEFAULT_DEBIAN_CA_PATH, PublicLoggingSetup::PUBLIC_DEFAULT_LINUX_CA_PATH, PublicLoggingSetup::PUBLIC_DEFAULT_UBUNTU_CA_PATH, PublicLoggingSetup::PUBLIC_DEFAULT_WINDOWS_CA_PATH, PublicLoggingSetup::PUBLIC_RESOURCE_PRINCIPAL_ENV_FILE, PublicLoggingSetup::R1_CA_PATH, PublicLoggingSetup::RETRIES, PublicLoggingSetup::USER_CONFIG_PROFILE_NAME, PublicLoggingSetup::USER_SIGNER_TYPE, PublicLoggingSetup::WINDOWS_OCI_CONFIG_DIR

Constants included from PublicLoggingUtils

PublicLoggingUtils::PUBLIC_CLIENT_SPEC_VERSION, PublicLoggingUtils::PUBLIC_LOGGING_PREFIX

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from PublicLoggingSetup

#add_rp_env_override, #create_instance_principal_signer, #create_resource_principal_signer, #get_federation_endpoint, #get_host_info_for_non_oci_instance, #get_host_info_for_oci_instance, #get_instance_md, #get_instance_md_with_retry, #get_instance_md_with_url, #get_logging_endpoint, #get_signer, #get_signer_type, #logger, #set_default_ca_file

Methods included from PublicLoggingUtils

#build_request, determine_config_profile_name, determine_linux_config_path, determine_windows_config_path, #encode_to_utf8, #flatten_hash, #get_modified_tag, #get_put_logs_details_request, #send_requests

Instance Attribute Details

#clientObject

Returns the value of attribute client.



41
42
43
# File 'lib/fluent/plugin/out_oci_logging.rb', line 41

def client
  @client
end

#hostnameObject

Returns the value of attribute hostname.



41
42
43
# File 'lib/fluent/plugin/out_oci_logging.rb', line 41

def hostname
  @hostname
end

Instance Method Details

#configure(conf) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/out_oci_logging.rb', line 47

def configure(conf)
  super
  log.debug 'determining the signer type'

  oci_config, signer_type = get_signer_type(principal_override: @principal_override)
  signer = get_signer(oci_config, signer_type)
  log.info "using authentication principal #{signer_type}"

  @client = OCI::Loggingingestion::LoggingClient.new(
    config: oci_config,
    endpoint: get_logging_endpoint(@region, logging_endpoint_override: @logging_endpoint_override),
    signer: signer,
    proxy_settings: nil,
    retry_config: nil
  )

  @client.api_client.request_option_overrides = { ca_file: @ca_file }
end

#startObject



66
67
68
69
# File 'lib/fluent/plugin/out_oci_logging.rb', line 66

def start
  super
  log.debug 'start'
end

#write(chunk) ⇒ Object

Sync Buffered Output ############################## Implement write() if your plugin uses a normal buffer.



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/fluent/plugin/out_oci_logging.rb', line 74

def write(chunk)
  log.debug "writing chunk metadata #{chunk.metadata}", \
            dump_unique_id_hex(chunk.unique_id)
  log_batches_map = {}
  # For standard chunk format (without #format() method) 
  size = 0 
  chunk.each do |time, record|
    begin
      tag = get_modified_tag(chunk..tag)
      source_identifier = record.key?('tailed_path') ? record['tailed_path'] : ''
      content = flatten_hash(record)
      size += content.to_json.bytesize
      build_request(time, record, tag, log_batches_map, source_identifier)
      if size >= PAYLOAD_SIZE
        log.info "Exceeding payload size. Size : #{size}"
        send_requests(log_batches_map)
        log_batches_map = {}
        size = 0
      end
    rescue StandardError => e
      log.error(e.full_message)
    end
  end
  # flushing data to LJ
  unless log_batches_map.empty?
    log.info "Payload size : #{size}"
    send_requests(log_batches_map)
  end
end