Class: Fluent::Plugin::ApplicationInsightsOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::ApplicationInsightsOutput
- Defined in:
- lib/fluent/plugin/out_application_insights.rb
Constant Summary collapse
- TELEMETRY_TYPES =
["RequestData", "RemoteDependencyData", "MessageData", "ExceptionData", "EventData", "MetricData", "PageViewData", "AvailabilityData"]
Instance Attribute Summary collapse
-
#tc ⇒ Object
Returns the value of attribute tc.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #get_context_non_standard_schema(record) ⇒ Object
- #merge_extra_properties_standard_schema(record, envelope) ⇒ Object
- #process(tag, es) ⇒ Object
- #process_non_standard_schema_log(record, time) ⇒ Object
- #process_standard_schema_log(record, time) ⇒ Object
- #set_context_standard_schema(record) ⇒ Object
- #set_context_tag(context, tag_name, tag_value) ⇒ Object
- #set_name_property(record) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #stringify_properties(record) ⇒ Object
Instance Attribute Details
#tc ⇒ Object
Returns the value of attribute tc.
12 13 14 |
# File 'lib/fluent/plugin/out_application_insights.rb', line 12 def tc @tc end |
Instance Method Details
#configure(conf) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/fluent/plugin/out_application_insights.rb', line 46 def configure(conf) super @severity_level_mapping = {} @severity_level_verbose.each { |l| @severity_level_mapping[l.downcase] = Channel::Contracts::SeverityLevel::VERBOSE } @severity_level_information.each { |l| @severity_level_mapping[l.downcase] = Channel::Contracts::SeverityLevel::INFORMATION } @severity_level_warning.each { |l| @severity_level_mapping[l.downcase] = Channel::Contracts::SeverityLevel::WARNING } @severity_level_error.each { |l| @severity_level_mapping[l.downcase] = Channel::Contracts::SeverityLevel::ERROR } @severity_level_critical.each { |l| @severity_level_mapping[l.downcase] = Channel::Contracts::SeverityLevel::CRITICAL } context_tag_keys = [] context_tag_keys.concat Channel::Contracts::Application.json_mappings.values context_tag_keys.concat Channel::Contracts::Cloud.json_mappings.values context_tag_keys.concat Channel::Contracts::Device.json_mappings.values context_tag_keys.concat Channel::Contracts::Internal.json_mappings.values context_tag_keys.concat Channel::Contracts::Location.json_mappings.values context_tag_keys.concat Channel::Contracts::Operation.json_mappings.values context_tag_keys.concat Channel::Contracts::Session.json_mappings.values context_tag_keys.concat Channel::Contracts::User.json_mappings.values @context_tag_accessors = {} context_tag_sources.each do |tag, property_path| raise ArgumentError.new("Context tag '#{tag}' is invalid!") unless context_tag_keys.include?(tag) @context_tag_accessors[tag] = record_accessor_create(property_path) end end |
#get_context_non_standard_schema(record) ⇒ Object
189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/fluent/plugin/out_application_insights.rb', line 189 def get_context_non_standard_schema(record) context = Channel::TelemetryContext.new context.instrumentation_key = @instrumentation_key return context if @context_tag_sources.length == 0 @context_tag_accessors.each do |tag, accessor| set_context_tag context, tag, accessor.call(record) end return context end |
#merge_extra_properties_standard_schema(record, envelope) ⇒ Object
164 165 166 167 168 169 170 |
# File 'lib/fluent/plugin/out_application_insights.rb', line 164 def merge_extra_properties_standard_schema(record, envelope) return if record.empty? envelope.data["baseData"]["properties"] ||= {} envelope.data["baseData"]["properties"].merge!(record) stringify_properties(envelope.data["baseData"]["properties"]) end |
#process(tag, es) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/fluent/plugin/out_application_insights.rb', line 104 def process(tag, es) es.each do |time, record| # 'time' is a Fluent::EventTime object or an Integer. Convert it to ruby Time object. time_ruby = time.is_a?(Fluent::EventTime) ? Time.at(time.sec, time.nsec / 1000).utc : Time.at(time) if @standard_schema process_standard_schema_log record, time_ruby else process_non_standard_schema_log record, time_ruby end end end |
#process_non_standard_schema_log(record, time) ⇒ Object
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/fluent/plugin/out_application_insights.rb', line 172 def process_non_standard_schema_log(record, time) time = record.delete(@time_property) || time context = get_context_non_standard_schema(record) = record.delete @message_property severity_level_value = record.delete @severity_property severity_level = severity_level_value ? @severity_level_mapping[severity_level_value.to_s.downcase] : nil props = stringify_properties(record) data = Channel::Contracts::MessageData.new( :message => || 'Null', :severity_level => severity_level || Channel::Contracts::SeverityLevel::INFORMATION, :properties => props || {} ) @tc.channel.write(data, context, time) end |
#process_standard_schema_log(record, time) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/fluent/plugin/out_application_insights.rb', line 116 def process_standard_schema_log(record, time) if record["data"] && record["data"].is_a?(Hash) && record["data"]["baseType"] && record["data"]["baseData"] base_type = record["data"]["baseType"] if TELEMETRY_TYPES.include? base_type # If the record is processed by json parser plugin, e.g., in_http use it by default, the time property will be removed. Add it back in this case. record["time"] ||= time.iso8601(7) record["iKey"] = @instrumentation_key set_name_property(record) if !record["name"] set_context_standard_schema record envelope = Channel::Contracts::Envelope.new Channel::Contracts::Envelope.json_mappings.each do |attr, name| property = record.delete(name) envelope.send(:"#{attr}=", property) if property end # There could be extra properties added during the fluentd pipeline. Merge the extra properties so they are not lost. merge_extra_properties_standard_schema record, envelope @tc.channel.queue.push(envelope) else log.debug "Unknown telemetry type #{base_type}. Event will be treated as as non standard schema event." process_non_standard_schema_log record, time end else log.debug "The event does not meet the standard schema of Application Insights output. Missing data, baseType or baseData property. Event will be treated as as non standard schema event." process_non_standard_schema_log record, time end end |
#set_context_standard_schema(record) ⇒ Object
154 155 156 157 158 159 160 161 162 |
# File 'lib/fluent/plugin/out_application_insights.rb', line 154 def set_context_standard_schema(record) return if @context_tag_sources.length == 0 record["tags"] = record["tags"] || {} @context_tag_accessors.each do |tag, accessor| tag_value = accessor.call(record) record["tags"][tag] = tag_value if !tag_value.nil? end end |
#set_context_tag(context, tag_name, tag_value) ⇒ Object
201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/fluent/plugin/out_application_insights.rb', line 201 def set_context_tag(context, tag_name, tag_value) return if tag_value.nil? context_set = [context.application, context.cloud, context.device, context.location, context.operation, context.session, context.user] context_set.each do |c| c.class.json_mappings.each do |attr, name| if (name == tag_name) c.send(:"#{attr}=", tag_value) return end end end end |
#set_name_property(record) ⇒ Object
147 148 149 150 151 152 |
# File 'lib/fluent/plugin/out_application_insights.rb', line 147 def set_name_property(record) normalizedIKey = @instrumentation_key.gsub("-", "") normalizedIKey = normalizedIKey.empty? ? "" : normalizedIKey + "." type = record["data"]["baseType"][0..-5] record["name"] = "Microsoft.ApplicationInsights.#{normalizedIKey}#{type}" end |
#shutdown ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/fluent/plugin/out_application_insights.rb', line 85 def shutdown super # Draining the events in the queue. # We need to make sure the work thread has finished. Otherwise, it's possible that the queue is empty, but the http request to send the data is not finished. # However, a drawback of waiting for the work thread to finish is that even if the events have been drained, it will still poll the queue for some time (default is 3 seconds, set by sender.send_time). # This can be improved if the SDK exposes another variable indicating whether the work thread is sending data or just polling the queue. while !@tc.channel.queue.empty? || @tc.channel.sender.work_thread != nil # It's possible that the work thread has already exited but there are still items in the queue. # https://github.com/Microsoft/ApplicationInsights-Ruby/blob/master/lib/application_insights/channel/asynchronous_sender.rb#L115 # Trigger flush to make the work thread working again in this case. if @tc.channel.sender.work_thread == nil && !@tc.channel.queue.empty? @tc.flush end sleep(1) end end |
#start ⇒ Object
74 75 76 77 78 79 80 81 82 83 |
# File 'lib/fluent/plugin/out_application_insights.rb', line 74 def start super sender = @service_endpoint_uri.nil? ? Channel::AsynchronousSender.new : Channel::AsynchronousSender.new(@service_endpoint_uri) queue = Channel::AsynchronousQueue.new sender channel = Channel::TelemetryChannel.new nil, queue @tc = TelemetryClient.new @instrumentation_key, channel @tc.channel.queue.max_queue_length = @send_buffer_size @tc.channel.sender.send_buffer_size = @send_buffer_size end |
#stringify_properties(record) ⇒ Object
215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/fluent/plugin/out_application_insights.rb', line 215 def stringify_properties(record) # If the property value is a json object or array, e.g., {"prop": {"inner_prop": value}}, it needs to be serialized. # Otherwise, the property will become {"prop": "[object Object]"} in the final telemetry. # The stringified property can be queried as described here: https://docs.loganalytics.io/docs/Language-Reference/Scalar-functions/parse_json() record.each do |key, value| if value.is_a?(Hash) || value.is_a?(Array) record[key] = JSON.generate(value) end end record end |