44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
# File 'lib/logstash/codecs/kafkatimemachine.rb', line 44
def encode(event)
kafka_datacenter_producer = event.get("[@metadata][kafka_datacenter_producer]")
kafka_topic_producer = event.get("[@metadata][kafka_topic_producer]")
kafka_consumer_group_producer = event.get("[@metadata][kafka_consumer_group_producer]")
kafka_append_time_producer = Float(event.get("[@metadata][kafka_append_time_producer]")) rescue nil
logstash_kafka_read_time_producer = Float(event.get("[@metadata][logstash_kafka_read_time_producer]")) rescue nil
kafka_producer_array = Array[kafka_datacenter_producer, kafka_topic_producer, kafka_consumer_group_producer, kafka_append_time_producer, logstash_kafka_read_time_producer]
@logger.debug("kafka_producer_array: #{kafka_producer_array}")
if (kafka_producer_array.any? { |text| text.nil? || text.to_s.empty? })
@logger.debug("kafka_producer_array invalid: Found null")
error_string_producer = "Error in producer data: #{kafka_producer_array}"
producer_valid = false
else
@logger.debug("kafka_producer_array valid")
producer_valid = true
logstash_kafka_read_time_producer = logstash_kafka_read_time_producer.to_i
kafka_append_time_producer = kafka_append_time_producer.to_i
kafka_producer_lag_ms = logstash_kafka_read_time_producer - kafka_append_time_producer
end
kafka_topic_aggregate = event.get("[@metadata][kafka_topic_aggregate]")
kafka_consumer_group_aggregate = event.get("[@metadata][kafka_consumer_group_aggregate]")
kafka_append_time_aggregate = Float(event.get("[@metadata][kafka_append_time_aggregate]")) rescue nil
logstash_kafka_read_time_aggregate = Float(event.get("[@metadata][logstash_kafka_read_time_aggregate]")) rescue nil
kafka_aggregate_array = Array[kafka_datacenter_producer, kafka_topic_aggregate, kafka_consumer_group_aggregate, kafka_append_time_aggregate, logstash_kafka_read_time_aggregate]
@logger.debug("kafka_aggregate_array: #{kafka_aggregate_array}")
if (kafka_aggregate_array.any? { |text| text.nil? || text.to_s.empty? })
@logger.debug("kafka_aggregate_array invalid: Found null")
error_string_aggregate = "Error in aggregate data: #{kafka_aggregate_array}"
aggregate_valid = false
else
@logger.debug("kafka_aggregate_array valid")
aggregate_valid = true
logstash_kafka_read_time_aggregate = logstash_kafka_read_time_aggregate.to_i
kafka_append_time_aggregate = kafka_append_time_aggregate.to_i
kafka_aggregate_lag_ms = logstash_kafka_read_time_aggregate - kafka_append_time_aggregate
end
kafka_logstash_influx_metric_time = (Time.now.to_f * (1000*1000*1000)).to_i
if (producer_valid == true && aggregate_valid == true)
kafka_total_lag_ms = logstash_kafka_read_time_aggregate - kafka_append_time_producer
influx_line_protocol = "kafka_lag_time,meta_source=lma,meta_type=ktm,meta_datacenter=#{kafka_datacenter_producer},ktm_lag_type=complete,kafka_topic_aggregate=#{kafka_topic_aggregate},kafka_consumer_group_aggregate=#{kafka_consumer_group_aggregate},kafka_topic_producer=#{kafka_topic_producer},kafka_consumer_group_producer=#{kafka_consumer_group_producer} kafka_total_lag_ms=#{kafka_total_lag_ms},kafka_aggregate_lag_ms=#{kafka_aggregate_lag_ms},kafka_producer_lag_ms=#{kafka_producer_lag_ms} #{kafka_logstash_influx_metric_time}"
elsif (producer_valid == true && aggregate_valid == false)
influx_line_protocol = "kafka_lag_time,meta_source=lma,meta_type=ktm,meta_datacenter=#{kafka_datacenter_producer},ktm_lag_type=producer,kafka_topic_producer=#{kafka_topic_producer},kafka_consumer_group_producer=#{kafka_consumer_group_producer} kafka_producer_lag_ms=#{kafka_producer_lag_ms} #{kafka_logstash_influx_metric_time}"
elsif (aggregate_valid == true && producer_valid == false)
influx_line_protocol = "kafka_lag_time,meta_source=lma,meta_type=ktm,meta_datacenter=#{kafka_datacenter_producer},ktm_lag_type=aggregate,kafka_topic_aggregate=#{kafka_topic_aggregate},kafka_consumer_group_aggregate=#{kafka_consumer_group_aggregate} kafka_aggregate_lag_ms=#{kafka_aggregate_lag_ms} #{kafka_logstash_influx_metric_time}"
elsif (aggregate_valid == false && producer_valid == false)
@logger.error("Error kafkatimemachine: Could not build valid response --> #{error_string_producer}, #{error_string_aggregate}")
influx_line_protocol = nil
end
if (!influx_line_protocol.nil? && @enable_log == true)
file_output(influx_line_protocol)
end
@on_event.call(event, event.sprintf(influx_line_protocol))
end
|