Class: OpenC3::LogMicroservice
- Inherits:
-
Microservice
- Object
- Microservice
- OpenC3::LogMicroservice
- Defined in:
- lib/openc3/microservices/log_microservice.rb
Constant Summary collapse
- DEFAULT_BUFFER_DEPTH =
1 minutes at 1Hz
60
Instance Attribute Summary
Attributes inherited from Microservice
#count, #custom, #error, #logger, #microservice_status_thread, #name, #scope, #secrets, #state
Instance Method Summary collapse
-
#initialize(name) ⇒ LogMicroservice
constructor
A new instance of LogMicroservice.
- #log_data(topic, msg_id, msg_hash, redis) ⇒ Object
- #run ⇒ Object
- #setup_plws ⇒ Object
- #shutdown ⇒ Object
Methods inherited from Microservice
#as_json, #microservice_cmd, run, #setup_microservice_topic
Constructor Details
#initialize(name) ⇒ LogMicroservice
Returns a new instance of LogMicroservice.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/openc3/microservices/log_microservice.rb', line 32 def initialize(name) super(name) @config['options'].each do |option| case option[0].upcase when 'RAW_OR_DECOM' @raw_or_decom = option[1].intern when 'CMD_OR_TLM' @cmd_or_tlm = option[1].intern when 'CYCLE_TIME' # Maximum time between log files @cycle_time = option[1].to_i when 'CYCLE_SIZE' # Maximum size of a log file @cycle_size = option[1].to_i when 'BUFFER_DEPTH' # Buffer depth to write in time order @buffer_depth = option[1].to_i else @logger.error("Unknown option passed to microservice #{@name}: #{option}") end end raise "Microservice #{@name} not fully configured" unless @raw_or_decom and @cmd_or_tlm # These settings limit the log file to 10 minutes or 50MB of data, whichever comes first @cycle_time = 600 unless @cycle_time # 10 minutes @cycle_size = 50_000_000 unless @cycle_size # ~50 MB @buffer_depth = DEFAULT_BUFFER_DEPTH unless @buffer_depth @error_count = 0 @metric.set(name: 'log_total', value: @count, type: 'counter') @metric.set(name: 'log_error_total', value: @error_count, type: 'counter') end |
Instance Method Details
#log_data(topic, msg_id, msg_hash, redis) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/openc3/microservices/log_microservice.rb', line 100 def log_data(topic, msg_id, msg_hash, redis) msgid_seconds_from_epoch = msg_id.split('-')[0].to_i / 1000.0 delta = Time.now.to_f - msgid_seconds_from_epoch @metric.set(name: 'log_topic_delta_seconds', value: delta, type: 'gauge', unit: 'seconds', help: 'Delta time between data written to stream and log start') topic_split = topic.gsub(/{|}/, '').split("__") # Remove the redis hashtag curly braces target_name = topic_split[2] packet_name = topic_split[3] rt_or_stored = ConfigParser.handle_true_false(msg_hash["stored"]) ? :STORED : :RT packet_type = nil data_key = nil if @raw_or_decom == :RAW packet_type = :RAW_PACKET data_key = "buffer" else # :DECOM packet_type = :JSON_PACKET data_key = "json_data" end received_time_nsec_since_epoch = msg_hash["received_time"] received_time_nsec_since_epoch = received_time_nsec_since_epoch.to_i if received_time_nsec_since_epoch @plws[target_name][rt_or_stored].buffered_write(packet_type, @cmd_or_tlm, target_name, packet_name, msg_hash["time"].to_i, rt_or_stored == :STORED, msg_hash[data_key], nil, topic, msg_id, received_time_nsec_since_epoch: received_time_nsec_since_epoch, extra: msg_hash['extra']) rescue => err @error = err @logger.error("#{@name} error: #{err.formatted}") @error_count += 1 @metric.set(name: 'log_error_total', value: @error_count, type: 'counter') end |
#run ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/openc3/microservices/log_microservice.rb', line 63 def run setup_plws() setup_microservice_topic() while true break if @cancel_thread Topic.read_topics(@topics) do |topic, msg_id, msg_hash, redis| break if @cancel_thread if topic == @microservice_topic microservice_cmd(topic, msg_id, msg_hash, redis) else log_data(topic, msg_id, msg_hash, redis) end @count += 1 @metric.set(name: 'log_total', value: @count, type: 'counter') end end end |
#setup_plws ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/openc3/microservices/log_microservice.rb', line 82 def setup_plws @plws = {} @topics.each do |topic| topic_split = topic.gsub(/{|}/, '').split("__") # Remove the redis hashtag curly braces scope = topic_split[0] target_name = topic_split[2] packet_name = topic_split[3] type = @raw_or_decom.to_s.downcase remote_log_directory = "#{scope}/#{type}_logs/#{@cmd_or_tlm.to_s.downcase}/#{target_name}" rt_label = "#{scope}__#{target_name}__ALL__rt__#{type}" stored_label = "#{scope}__#{target_name}__ALL__stored__#{type}" @plws[target_name] ||= { :RT => BufferedPacketLogWriter.new(remote_log_directory, rt_label, true, @cycle_time, @cycle_size, nil, nil, true, @buffer_depth), :STORED => BufferedPacketLogWriter.new(remote_log_directory, stored_label, true, @cycle_time, @cycle_size, nil, nil, true, @buffer_depth) } end end |
#shutdown ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/openc3/microservices/log_microservice.rb', line 128 def shutdown # Make sure all the existing logs are properly closed down threads = [] @plws.each do |target_name, plw_hash| plw_hash.each do |type, plw| threads.concat(plw.shutdown) end end # Wait for all the logging threads to move files to buckets threads.flatten.compact.each do |thread| thread.join end super() end |