Class: LogStash::Codecs::KafkaTimeMachine

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/codecs/kafkatimemachine.rb

Instance Method Summary collapse

Instance Method Details

#decode(data) ⇒ Object



39
40
41
# File 'lib/logstash/codecs/kafkatimemachine.rb', line 39

def decode(data)
  raise "Not implemented"
end

#encode(event) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/logstash/codecs/kafkatimemachine.rb', line 44

def encode(event)

  # Extract producer data and check for validity; note that kafka_datacenter_producer is used for both producer and aggregate arrays
  kafka_datacenter_producer = event.get("[@metadata][kafka_datacenter_producer]")
  kafka_topic_producer = event.get("[@metadata][kafka_topic_producer]")
  kafka_consumer_group_producer = event.get("[@metadata][kafka_consumer_group_producer]")
  kafka_append_time_producer = Float(event.get("[@metadata][kafka_append_time_producer]")) rescue nil 
  logstash_kafka_read_time_producer = Float(event.get("[@metadata][logstash_kafka_read_time_producer]")) rescue nil

  kafka_producer_array = Array[kafka_datacenter_producer, kafka_topic_producer, kafka_consumer_group_producer, kafka_append_time_producer, logstash_kafka_read_time_producer]
  @logger.debug("kafka_producer_array: #{kafka_producer_array}")

  if (kafka_producer_array.any? { |text| text.nil? || text.to_s.empty? })
    @logger.debug("kafka_producer_array invalid: Found null")
    error_string_producer = "Error in producer data: #{kafka_producer_array}"
    producer_valid = false
  else
    @logger.debug("kafka_producer_array valid")
    producer_valid = true
    logstash_kafka_read_time_producer = logstash_kafka_read_time_producer.to_i
    kafka_append_time_producer = kafka_append_time_producer.to_i
    kafka_producer_lag_ms = logstash_kafka_read_time_producer - kafka_append_time_producer
  end

  # Extract aggregate data and check for validity
  kafka_topic_aggregate = event.get("[@metadata][kafka_topic_aggregate]")
  kafka_consumer_group_aggregate = event.get("[@metadata][kafka_consumer_group_aggregate]")
  kafka_append_time_aggregate = Float(event.get("[@metadata][kafka_append_time_aggregate]")) rescue nil
  logstash_kafka_read_time_aggregate = Float(event.get("[@metadata][logstash_kafka_read_time_aggregate]")) rescue nil

  kafka_aggregate_array = Array[kafka_datacenter_producer, kafka_topic_aggregate, kafka_consumer_group_aggregate, kafka_append_time_aggregate, logstash_kafka_read_time_aggregate]
  @logger.debug("kafka_aggregate_array: #{kafka_aggregate_array}")

  if (kafka_aggregate_array.any? { |text| text.nil? || text.to_s.empty? })
    @logger.debug("kafka_aggregate_array invalid: Found null")
    error_string_aggregate = "Error in aggregate data: #{kafka_aggregate_array}"
    aggregate_valid = false
  else
    @logger.debug("kafka_aggregate_array valid")
    aggregate_valid = true
    logstash_kafka_read_time_aggregate = logstash_kafka_read_time_aggregate.to_i
    kafka_append_time_aggregate = kafka_append_time_aggregate.to_i
    kafka_aggregate_lag_ms = logstash_kafka_read_time_aggregate - kafka_append_time_aggregate
  end

  # Get current time for influxdb timestamp
  kafka_logstash_influx_metric_time = (Time.now.to_f * (1000*1000*1000)).to_i

  if (producer_valid == true && aggregate_valid == true)
    kafka_total_lag_ms = logstash_kafka_read_time_aggregate - kafka_append_time_producer
    influx_line_protocol = "kafka_lag_time,meta_source=lma,meta_type=ktm,meta_datacenter=#{kafka_datacenter_producer},ktm_lag_type=complete,kafka_topic_aggregate=#{kafka_topic_aggregate},kafka_consumer_group_aggregate=#{kafka_consumer_group_aggregate},kafka_topic_producer=#{kafka_topic_producer},kafka_consumer_group_producer=#{kafka_consumer_group_producer} kafka_total_lag_ms=#{kafka_total_lag_ms},kafka_aggregate_lag_ms=#{kafka_aggregate_lag_ms},kafka_producer_lag_ms=#{kafka_producer_lag_ms} #{kafka_logstash_influx_metric_time}"
  elsif (producer_valid == true && aggregate_valid == false)
    influx_line_protocol = "kafka_lag_time,meta_source=lma,meta_type=ktm,meta_datacenter=#{kafka_datacenter_producer},ktm_lag_type=producer,kafka_topic_producer=#{kafka_topic_producer},kafka_consumer_group_producer=#{kafka_consumer_group_producer} kafka_producer_lag_ms=#{kafka_producer_lag_ms} #{kafka_logstash_influx_metric_time}"
  elsif (aggregate_valid == true && producer_valid == false)
    influx_line_protocol = "kafka_lag_time,meta_source=lma,meta_type=ktm,meta_datacenter=#{kafka_datacenter_producer},ktm_lag_type=aggregate,kafka_topic_aggregate=#{kafka_topic_aggregate},kafka_consumer_group_aggregate=#{kafka_consumer_group_aggregate} kafka_aggregate_lag_ms=#{kafka_aggregate_lag_ms} #{kafka_logstash_influx_metric_time}"
  elsif (aggregate_valid == false && producer_valid == false)
    @logger.error("Error kafkatimemachine: Could not build valid response --> #{error_string_producer}, #{error_string_aggregate}")
    influx_line_protocol = nil
  end

  if (!influx_line_protocol.nil? && @enable_log == true)
    file_output(influx_line_protocol)
  end

  @on_event.call(event, event.sprintf(influx_line_protocol))

end

#file_output(output_line) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/logstash/codecs/kafkatimemachine.rb', line 15

def file_output( output_line )
  
  # Limit max file size to 5MB to protect integrity of host
  max_file_size = 5242880

  # Open file and append until max size reached
  File.open("/tmp/kafkatimemachine.txt", "a") do |f|
    if (f.size <= max_file_size)
      f.puts(output_line)
      @write_end_of_log = true
    elsif (f.size > max_file_size && @write_end_of_log == true)
      f.puts("Maximum file size of #{max_file_size} bytes reached; delete /tmp/kafkatimemachine.txt to resume writing")
      @write_end_of_log = false
    end
  end

end

#registerObject



34
35
36
# File 'lib/logstash/codecs/kafkatimemachine.rb', line 34

def register

end