Class: Fluent::Plugin::ApplicationInsightsOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_application_insights.rb

Constant Summary collapse

TELEMETRY_TYPES =
["RequestData", "RemoteDependencyData", "MessageData", "ExceptionData", "EventData", "MetricData", "PageViewData", "AvailabilityData"]

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#tcObject

Returns the value of attribute tc.



12
13
14
# File 'lib/fluent/plugin/out_application_insights.rb', line 12

def tc
  @tc
end

Instance Method Details

#configure(conf) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/fluent/plugin/out_application_insights.rb', line 46

def configure(conf)
  super

  @severity_level_mapping = {}
  @severity_level_verbose.each { |l| @severity_level_mapping[l.downcase] = Channel::Contracts::SeverityLevel::VERBOSE }
  @severity_level_information.each { |l| @severity_level_mapping[l.downcase] = Channel::Contracts::SeverityLevel::INFORMATION }
  @severity_level_warning.each { |l| @severity_level_mapping[l.downcase] = Channel::Contracts::SeverityLevel::WARNING }
  @severity_level_error.each { |l| @severity_level_mapping[l.downcase] = Channel::Contracts::SeverityLevel::ERROR }
  @severity_level_critical.each { |l| @severity_level_mapping[l.downcase] = Channel::Contracts::SeverityLevel::CRITICAL }

  context_tag_keys = []
  context_tag_keys.concat Channel::Contracts::Application.json_mappings.values
  context_tag_keys.concat Channel::Contracts::Cloud.json_mappings.values
  context_tag_keys.concat Channel::Contracts::Device.json_mappings.values
  context_tag_keys.concat Channel::Contracts::Internal.json_mappings.values
  context_tag_keys.concat Channel::Contracts::Location.json_mappings.values
  context_tag_keys.concat Channel::Contracts::Operation.json_mappings.values
  context_tag_keys.concat Channel::Contracts::Session.json_mappings.values
  context_tag_keys.concat Channel::Contracts::User.json_mappings.values

  @context_tag_accessors = {}
  context_tag_sources.each do |tag, property_path|
    raise ArgumentError.new("Context tag '#{tag}' is invalid!") unless context_tag_keys.include?(tag)

    @context_tag_accessors[tag] = record_accessor_create(property_path)
  end
end

#get_context_non_standard_schema(record) ⇒ Object



189
190
191
192
193
194
195
196
197
198
199
# File 'lib/fluent/plugin/out_application_insights.rb', line 189

def get_context_non_standard_schema(record)
  context = Channel::TelemetryContext.new
  context.instrumentation_key = @instrumentation_key
  return context if @context_tag_sources.length == 0

  @context_tag_accessors.each do |tag, accessor|
    set_context_tag context, tag, accessor.call(record)
  end

  return context
end

#merge_extra_properties_standard_schema(record, envelope) ⇒ Object



164
165
166
167
168
169
170
# File 'lib/fluent/plugin/out_application_insights.rb', line 164

def merge_extra_properties_standard_schema(record, envelope)
  return if record.empty?

  envelope.data["baseData"]["properties"] ||= {}
  envelope.data["baseData"]["properties"].merge!(record)
  stringify_properties(envelope.data["baseData"]["properties"])
end

#process(tag, es) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
# File 'lib/fluent/plugin/out_application_insights.rb', line 104

def process(tag, es)
  es.each do |time, record|
    # 'time' is a Fluent::EventTime object or an Integer. Convert it to ruby Time object.
    time_ruby = time.is_a?(Fluent::EventTime) ? Time.at(time.sec, time.nsec / 1000).utc : Time.at(time)
    if @standard_schema
      process_standard_schema_log record, time_ruby
    else
      process_non_standard_schema_log record, time_ruby
    end
  end
end

#process_non_standard_schema_log(record, time) ⇒ Object



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/fluent/plugin/out_application_insights.rb', line 172

def process_non_standard_schema_log(record, time)
  time = record.delete(@time_property) || time
  context = get_context_non_standard_schema(record)
  message = record.delete @message_property
  severity_level_value = record.delete @severity_property
  severity_level = severity_level_value ? @severity_level_mapping[severity_level_value.to_s.downcase] : nil
  props = stringify_properties(record)

  data = Channel::Contracts::MessageData.new(
    :message => message || 'Null',
    :severity_level => severity_level || Channel::Contracts::SeverityLevel::INFORMATION,
    :properties => props || {}
  )

  @tc.channel.write(data, context, time)
end

#process_standard_schema_log(record, time) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/fluent/plugin/out_application_insights.rb', line 116

def process_standard_schema_log(record, time)
  if record["data"] && record["data"].is_a?(Hash) && record["data"]["baseType"] && record["data"]["baseData"]
    base_type = record["data"]["baseType"]

    if TELEMETRY_TYPES.include? base_type
      # If the record is processed by json parser plugin, e.g., in_http use it by default, the time property will be removed. Add it back in this case.
      record["time"] ||= time.iso8601(7)
      record["iKey"] = @instrumentation_key
      set_name_property(record) if !record["name"]
      set_context_standard_schema record

      envelope = Channel::Contracts::Envelope.new
      Channel::Contracts::Envelope.json_mappings.each do |attr, name|
        property = record.delete(name)
        envelope.send(:"#{attr}=", property) if property
      end

      # There could be extra properties added during the fluentd pipeline. Merge the extra properties so they are not lost.
      merge_extra_properties_standard_schema record, envelope

      @tc.channel.queue.push(envelope)
    else
      log.debug "Unknown telemetry type #{base_type}. Event will be treated as as non standard schema event."
      process_non_standard_schema_log record, time
    end
  else
    log.debug "The event does not meet the standard schema of Application Insights output. Missing data, baseType or baseData property. Event will be treated as as non standard schema event."
    process_non_standard_schema_log record, time
  end
end

#set_context_standard_schema(record) ⇒ Object



154
155
156
157
158
159
160
161
162
# File 'lib/fluent/plugin/out_application_insights.rb', line 154

def set_context_standard_schema(record)
  return if @context_tag_sources.length == 0

  record["tags"] = record["tags"] || {}
  @context_tag_accessors.each do |tag, accessor|
    tag_value = accessor.call(record)
    record["tags"][tag] = tag_value if !tag_value.nil?
  end
end

#set_context_tag(context, tag_name, tag_value) ⇒ Object



201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/fluent/plugin/out_application_insights.rb', line 201

def set_context_tag(context, tag_name, tag_value)
  return if tag_value.nil?

  context_set = [context.application, context.cloud, context.device, context.location, context.operation, context.session, context.user]
  context_set.each do |c|
    c.class.json_mappings.each do |attr, name|
      if (name == tag_name)
        c.send(:"#{attr}=", tag_value)
        return
      end
    end
  end
end

#set_name_property(record) ⇒ Object



147
148
149
150
151
152
# File 'lib/fluent/plugin/out_application_insights.rb', line 147

def set_name_property(record)
  normalizedIKey = @instrumentation_key.gsub("-", "")
  normalizedIKey = normalizedIKey.empty? ? "" : normalizedIKey + "."
  type = record["data"]["baseType"][0..-5]
  record["name"] = "Microsoft.ApplicationInsights.#{normalizedIKey}#{type}"
end

#shutdownObject



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/fluent/plugin/out_application_insights.rb', line 85

def shutdown
  super

  # Draining the events in the queue.
  # We need to make sure the work thread has finished. Otherwise, it's possible that the queue is empty, but the http request to send the data is not finished.
  # However, a drawback of waiting for the work thread to finish is that even if the events have been drained, it will still poll the queue for some time (default is 3 seconds, set by sender.send_time).
  # This can be improved if the SDK exposes another variable indicating whether the work thread is sending data or just polling the queue.
  while !@tc.channel.queue.empty? || @tc.channel.sender.work_thread != nil
    # It's possible that the work thread has already exited but there are still items in the queue.
    # https://github.com/Microsoft/ApplicationInsights-Ruby/blob/master/lib/application_insights/channel/asynchronous_sender.rb#L115
    # Trigger flush to make the work thread working again in this case.
    if @tc.channel.sender.work_thread == nil && !@tc.channel.queue.empty?
      @tc.flush
    end

    sleep(1)
  end
end

#startObject



74
75
76
77
78
79
80
81
82
83
# File 'lib/fluent/plugin/out_application_insights.rb', line 74

def start
  super

  sender = @service_endpoint_uri.nil? ? Channel::AsynchronousSender.new : Channel::AsynchronousSender.new(@service_endpoint_uri)
  queue = Channel::AsynchronousQueue.new sender
  channel = Channel::TelemetryChannel.new nil, queue
  @tc = TelemetryClient.new @instrumentation_key, channel
  @tc.channel.queue.max_queue_length = @send_buffer_size
  @tc.channel.sender.send_buffer_size = @send_buffer_size
end

#stringify_properties(record) ⇒ Object



215
216
217
218
219
220
221
222
223
224
225
# File 'lib/fluent/plugin/out_application_insights.rb', line 215

def stringify_properties(record)
  # If the property value is a json object or array, e.g., {"prop": {"inner_prop": value}}, it needs to be serialized.
  # Otherwise, the property will become {"prop": "[object Object]"} in the final telemetry.
  # The stringified property can be queried as described here: https://docs.loganalytics.io/docs/Language-Reference/Scalar-functions/parse_json()
  record.each do |key, value|
    if value.is_a?(Hash) || value.is_a?(Array)
      record[key] = JSON.generate(value)
    end
  end
  record
end