Class: OpenC3::PacketLogWriter

Inherits:
LogWriter show all
Includes:
PacketLogConstants
Defined in:
lib/openc3/logs/packet_log_writer.rb

Overview

Creates a packet log. Can automatically cycle the log based on an elasped time period or when the log file reaches a predefined size.

Direct Known Subclasses

BufferedPacketLogWriter

Constant Summary

Constants included from PacketLogConstants

OpenC3::PacketLogConstants::COSMOS2_FILE_HEADER, OpenC3::PacketLogConstants::COSMOS4_FILE_HEADER, OpenC3::PacketLogConstants::OPENC3_CBOR_FLAG_MASK, OpenC3::PacketLogConstants::OPENC3_CMD_FLAG_MASK, OpenC3::PacketLogConstants::OPENC3_ENTRY_TYPE_MASK, OpenC3::PacketLogConstants::OPENC3_EXTRA_FLAG_MASK, OpenC3::PacketLogConstants::OPENC3_EXTRA_LENGTH_FIXED_SIZE, OpenC3::PacketLogConstants::OPENC3_EXTRA_LENGTH_PACK_DIRECTIVE, OpenC3::PacketLogConstants::OPENC3_EXTRA_LENGTH_PACK_ITEMS, OpenC3::PacketLogConstants::OPENC3_FILE_HEADER, OpenC3::PacketLogConstants::OPENC3_HEADER_LENGTH, OpenC3::PacketLogConstants::OPENC3_ID_FIXED_SIZE, OpenC3::PacketLogConstants::OPENC3_ID_FLAG_MASK, OpenC3::PacketLogConstants::OPENC3_INDEX_HEADER, OpenC3::PacketLogConstants::OPENC3_JSON_PACKET_ENTRY_TYPE_MASK, OpenC3::PacketLogConstants::OPENC3_KEY_MAP_ENTRY_TYPE_MASK, OpenC3::PacketLogConstants::OPENC3_KEY_MAP_PACK_DIRECTIVE, OpenC3::PacketLogConstants::OPENC3_KEY_MAP_PACK_ITEMS, OpenC3::PacketLogConstants::OPENC3_KEY_MAP_SECONDARY_FIXED_SIZE, OpenC3::PacketLogConstants::OPENC3_MAX_PACKET_INDEX, OpenC3::PacketLogConstants::OPENC3_MAX_TARGET_INDEX, OpenC3::PacketLogConstants::OPENC3_OFFSET_MARKER_ENTRY_TYPE_MASK, OpenC3::PacketLogConstants::OPENC3_OFFSET_MARKER_PACK_DIRECTIVE, OpenC3::PacketLogConstants::OPENC3_OFFSET_MARKER_PACK_ITEMS, OpenC3::PacketLogConstants::OPENC3_OFFSET_MARKER_SECONDARY_FIXED_SIZE, OpenC3::PacketLogConstants::OPENC3_PACKET_DECLARATION_ENTRY_TYPE_MASK, OpenC3::PacketLogConstants::OPENC3_PACKET_DECLARATION_PACK_DIRECTIVE, OpenC3::PacketLogConstants::OPENC3_PACKET_DECLARATION_PACK_ITEMS, OpenC3::PacketLogConstants::OPENC3_PACKET_DECLARATION_SECONDARY_FIXED_SIZE, OpenC3::PacketLogConstants::OPENC3_PACKET_PACK_DIRECTIVE, OpenC3::PacketLogConstants::OPENC3_PACKET_PACK_ITEMS, OpenC3::PacketLogConstants::OPENC3_PACKET_SECONDARY_FIXED_SIZE, OpenC3::PacketLogConstants::OPENC3_PRIMARY_FIXED_SIZE, OpenC3::PacketLogConstants::OPENC3_RAW_PACKET_ENTRY_TYPE_MASK, OpenC3::PacketLogConstants::OPENC3_RECEIVED_TIME_FIXED_SIZE, OpenC3::PacketLogConstants::OPENC3_RECEIVED_TIME_FLAG_MASK, OpenC3::PacketLogConstants::OPENC3_RECEIVED_TIME_PACK_DIRECTIVE, OpenC3::PacketLogConstants::OPENC3_RECEIVED_TIME_PACK_ITEMS, OpenC3::PacketLogConstants::OPENC3_STORED_FLAG_MASK, OpenC3::PacketLogConstants::OPENC3_TARGET_DECLARATION_ENTRY_TYPE_MASK, OpenC3::PacketLogConstants::OPENC3_TARGET_DECLARATION_PACK_DIRECTIVE, OpenC3::PacketLogConstants::OPENC3_TARGET_DECLARATION_PACK_ITEMS, OpenC3::PacketLogConstants::OPENC3_TARGET_DECLARATION_SECONDARY_FIXED_SIZE

Constants inherited from LogWriter

LogWriter::CLEANUP_DELAY, LogWriter::CYCLE_TIME_INTERVAL

Instance Attribute Summary collapse

Attributes inherited from LogWriter

#cleanup_offsets, #cleanup_times, #cycle_hour, #cycle_minute, #cycle_size, #cycle_time, #filename, #logging_enabled, #mutex, #start_time

Instance Method Summary collapse

Methods inherited from LogWriter

#create_unique_filename, #cycle_thread_body, #first_timestamp, #graceful_kill, #last_timestamp, #prepare_write, #shutdown, #start, #stop

Constructor Details

#initialize(remote_log_directory, label, logging_enabled = true, cycle_time = nil, cycle_size = 1_000_000_000, cycle_hour = nil, cycle_minute = nil, enforce_time_order = true, scope: $openc3_scope) ⇒ PacketLogWriter

Returns a new instance of PacketLogWriter.

Parameters:

  • remote_log_directory (String)

    The path to store the log files

  • label (String)

    Label to apply to the log filename

  • logging_enabled (Boolean) (defaults to: true)

    Whether to start with logging enabled

  • cycle_time (Integer) (defaults to: nil)

    The amount of time in seconds before creating a new log file. This can be combined with cycle_size.

  • cycle_size (Integer) (defaults to: 1_000_000_000)

    The size in bytes before creating a new log file. This can be combined with cycle_time.

  • cycle_hour (Integer) (defaults to: nil)

    The time at which to cycle the log. Combined with cycle_minute to cycle the log daily at the specified time. If nil, the log will be cycled hourly at the specified cycle_minute.

  • cycle_minute (Integer) (defaults to: nil)

    The time at which to cycle the log. See cycle_hour for more information.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/openc3/logs/packet_log_writer.rb', line 48

def initialize(
  remote_log_directory,
  label,
  logging_enabled = true,
  cycle_time = nil,
  cycle_size = 1_000_000_000,
  cycle_hour = nil,
  cycle_minute = nil,
  enforce_time_order = true,
  scope: $openc3_scope
)
  super(
    remote_log_directory,
    logging_enabled,
    cycle_time,
    cycle_size,
    cycle_hour,
    cycle_minute,
    enforce_time_order
  )
  @label = label
  @cmd_packet_table = {}
  @tlm_packet_table = {}
  @key_map_table = {}
  @target_dec_entries = []
  @packet_dec_entries = []
  @next_packet_index = 0
  @target_indexes = {}
  @next_target_index = 0
  @data_format = :CBOR # Default to CBOR for improved compression
  @target_id_cache = {}
  @packet_id_cache = {}
  @scope = scope
end

Instance Attribute Details

#data_formatObject

Returns the value of attribute data_format.



34
35
36
# File 'lib/openc3/logs/packet_log_writer.rb', line 34

def data_format
  @data_format
end

Instance Method Details

#bucket_filenameObject



344
345
346
# File 'lib/openc3/logs/packet_log_writer.rb', line 344

def bucket_filename
  "#{first_timestamp}__#{last_timestamp}__#{@label}" + extension
end

#close_file(take_mutex = true) ⇒ Object

Closing a log file isn’t critical so we just log an error Returns threads that moves log to bucket



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/openc3/logs/packet_log_writer.rb', line 144

def close_file(take_mutex = true)
  threads = []
  @mutex.lock if take_mutex
  begin
    # Need to write the OFFSET_MARKER for each packet
    @last_offsets.each do |redis_topic, last_offset|
      write_entry(:OFFSET_MARKER, nil, nil, nil, nil, nil, last_offset + ',' + redis_topic, nil) if @file
    end

    threads.concat(super(false))

  ensure
    @mutex.unlock if take_mutex
  end
  return threads
end

#extensionObject



348
349
350
# File 'lib/openc3/logs/packet_log_writer.rb', line 348

def extension
  '.bin'.freeze
end

#get_packet_index(cmd_or_tlm, target_name, packet_name, entry_type, data) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/openc3/logs/packet_log_writer.rb', line 161

def get_packet_index(cmd_or_tlm, target_name, packet_name, entry_type, data)
  if cmd_or_tlm == :CMD
    target_table = @cmd_packet_table[target_name]
  else
    target_table = @tlm_packet_table[target_name]
  end
  if target_table
    packet_index = target_table[packet_name]
    return packet_index if packet_index
  else
    # New packet_table entry needed
    target_table = {}
    if cmd_or_tlm == :CMD
      @cmd_packet_table[target_name] = target_table
    else
      @tlm_packet_table[target_name] = target_table
    end
    id = nil
    unless ENV['OPENC3_NO_STORE']
      id = @target_id_cache[target_name]
      unless id
        target = TargetModel.get(name: target_name, scope: @scope)
        id = target["id"] if target
        @target_id_cache[target_name] = id
      end
    end
    write_entry(:TARGET_DECLARATION, cmd_or_tlm, target_name, packet_name, nil, nil, nil, id)
  end

  # New target_table entry needed
  packet_index = @next_packet_index
  if packet_index > OPENC3_MAX_PACKET_INDEX
    raise "Packet Index Overflow"
  end

  target_table[packet_name] = packet_index
  @next_packet_index += 1

  id = nil
  begin
    unless ENV['OPENC3_NO_STORE']
      cache_key = "#{cmd_or_tlm}__#{target_name}__#{packet_name}"
      id = @packet_id_cache[cache_key]
      unless id
        target_model_packet = TargetModel.packet(target_name, packet_name, type: cmd_or_tlm, scope: @scope)
        id = target_model_packet["config_name"] if target_model_packet
        @packet_id_cache[cache_key] = id
      end
    end
  rescue
    # No packet def
  end
  write_entry(:PACKET_DECLARATION, cmd_or_tlm, target_name, packet_name, nil, nil, nil, id)
  if entry_type == :JSON_PACKET
    key_map = @key_map_table[packet_index]
    unless key_map
      parsed = data
      parsed = JSON.parse(data, :allow_nan => true, :create_additions => true) if String === parsed
      keys = parsed.keys
      key_map = {}
      reverse_key_map = {}
      keys.each_with_index do |key, index|
        key_map[index.to_s] = key
        reverse_key_map[key] = index.to_s
      end
      @key_map_table[packet_index] = reverse_key_map
      if @data_format == :CBOR
        write_entry(:KEY_MAP, cmd_or_tlm, target_name, packet_name, nil, nil, key_map.to_cbor, nil)
      else # JSON
        write_entry(:KEY_MAP, cmd_or_tlm, target_name, packet_name, nil, nil, JSON.generate(key_map, :allow_nan => true), nil)
      end
    end
  end
  return packet_index
end

#start_new_fileObject

Starting a new file is a critical operation so the entire method is wrapped with a rescue and handled with handle_critical_exception Assumes mutex has already been taken



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/openc3/logs/packet_log_writer.rb', line 123

def start_new_file
  super
  @file.write(OPENC3_FILE_HEADER)
  @file_size += OPENC3_FILE_HEADER.length

  @cmd_packet_table = {}
  @tlm_packet_table = {}
  @key_map_table = {}
  @next_packet_index = 0
  @target_indexes = {}
  @next_target_index = 0
  @target_dec_entries = []
  @packet_dec_entries = []
rescue => err
  Logger.error "Error starting new log file: #{err.formatted}"
  @logging_enabled = false
  OpenC3.handle_critical_exception(err)
end

#write(entry_type, cmd_or_tlm, target_name, packet_name, time_nsec_since_epoch, stored, data, id = nil, redis_topic = nil, redis_offset = '0-0', take_mutex: true, allow_new_file: true, received_time_nsec_since_epoch: nil, extra: nil) ⇒ Object

Write a packet to the log file.

If no log file currently exists in the filesystem, a new file will be created.

Parameters:

  • entry_type (Symbol)

    Type of entry to write. Must be one of :TARGET_DECLARATION, :PACKET_DECLARATION, :RAW_PACKET, :JSON_PACKET, :OFFSET_MARKER, :KEY_MAP

  • cmd_or_tlm (Symbol)

    One of :CMD or :TLM

  • target_name (String)

    Name of the target

  • packet_name (String)

    Name of the packet

  • time_nsec_since_epoch (Integer)

    64 bit integer nsecs since EPOCH

  • stored (Boolean)

    Whether this data is stored telemetry

  • data (String)

    Binary string of data

  • id (Integer) (defaults to: nil)

    Target ID

  • redis_offset (Integer) (defaults to: '0-0')

    The offset of this packet in its Redis stream



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/openc3/logs/packet_log_writer.rb', line 98

def write(entry_type, cmd_or_tlm, target_name, packet_name, time_nsec_since_epoch, stored, data, id = nil, redis_topic = nil, redis_offset = '0-0', take_mutex: true, allow_new_file: true, received_time_nsec_since_epoch: nil, extra: nil)
  return if !@logging_enabled

  @mutex.lock if take_mutex
  begin
    if entry_type == :RAW_PACKET or entry_type == :JSON_PACKET
      # Only care about the timestamps on the real packets being in order
      process_out_of_order = true
    else
      # Metadata timestamps don't matter
      process_out_of_order = false
    end
    prepare_write(time_nsec_since_epoch, data.length, redis_topic, redis_offset, allow_new_file: allow_new_file, process_out_of_order: process_out_of_order)
    write_entry(entry_type, cmd_or_tlm, target_name, packet_name, time_nsec_since_epoch, stored, data, id, received_time_nsec_since_epoch: received_time_nsec_since_epoch, extra: extra) if @file
  ensure
    @mutex.unlock if take_mutex
  end
rescue => err
  Logger.instance.error "Error writing #{@filename} : #{err.formatted}"
  OpenC3.handle_critical_exception(err)
end

#write_entry(entry_type, cmd_or_tlm, target_name, packet_name, time_nsec_since_epoch, stored, data, id, received_time_nsec_since_epoch: nil, extra: nil) ⇒ Object

Raises:

  • (ArgumentError)


237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
# File 'lib/openc3/logs/packet_log_writer.rb', line 237

def write_entry(entry_type, cmd_or_tlm, target_name, packet_name, time_nsec_since_epoch, stored, data, id, received_time_nsec_since_epoch: nil, extra: nil)
  raise ArgumentError.new("Length of id must be 64, got #{id.length}") if id and id.length != 64 # 64 hex digits, gets packed to 32 bytes with .pack('H*')

  length = OPENC3_PRIMARY_FIXED_SIZE
  flags = 0
  flags |= OPENC3_STORED_FLAG_MASK if stored
  flags |= OPENC3_ID_FLAG_MASK if id
  case entry_type
  when :TARGET_DECLARATION
    target_index = @next_target_index
    @target_indexes[target_name] = target_index
    @next_target_index += 1
    if target_index > OPENC3_MAX_TARGET_INDEX
      raise "Target Index Overflow"
    end

    flags |= OPENC3_TARGET_DECLARATION_ENTRY_TYPE_MASK
    length += OPENC3_TARGET_DECLARATION_SECONDARY_FIXED_SIZE + target_name.length
    length += OPENC3_ID_FIXED_SIZE if id
    @entry.clear
    @entry << [length, flags].pack(OPENC3_TARGET_DECLARATION_PACK_DIRECTIVE) << target_name
    @entry << [id].pack('H*') if id
    @target_dec_entries << @entry.dup
  when :PACKET_DECLARATION
    target_index = @target_indexes[target_name]
    flags |= OPENC3_PACKET_DECLARATION_ENTRY_TYPE_MASK
    if cmd_or_tlm == :CMD
      flags |= OPENC3_CMD_FLAG_MASK
    end
    length += OPENC3_PACKET_DECLARATION_SECONDARY_FIXED_SIZE + packet_name.length
    length += OPENC3_ID_FIXED_SIZE if id
    @entry.clear
    @entry << [length, flags, target_index].pack(OPENC3_PACKET_DECLARATION_PACK_DIRECTIVE) << packet_name
    @entry << [id].pack('H*') if id
    @packet_dec_entries << @entry.dup
  when :KEY_MAP
    flags |= OPENC3_KEY_MAP_ENTRY_TYPE_MASK
    flags |= OPENC3_CBOR_FLAG_MASK if @data_format == :CBOR
    length += OPENC3_KEY_MAP_SECONDARY_FIXED_SIZE + data.length
    packet_index = get_packet_index(cmd_or_tlm, target_name, packet_name, entry_type, data)
    @entry.clear
    @entry << [length, flags, packet_index].pack(OPENC3_KEY_MAP_PACK_DIRECTIVE) << data
  when :OFFSET_MARKER
    flags |= OPENC3_OFFSET_MARKER_ENTRY_TYPE_MASK
    length += OPENC3_OFFSET_MARKER_SECONDARY_FIXED_SIZE + data.length
    @entry.clear
    @entry << [length, flags].pack(OPENC3_OFFSET_MARKER_PACK_DIRECTIVE) << data
  when :RAW_PACKET, :JSON_PACKET
    target_name = 'UNKNOWN'.freeze unless target_name
    packet_name = 'UNKNOWN'.freeze unless packet_name
    packet_index = get_packet_index(cmd_or_tlm, target_name, packet_name, entry_type, data)
    if entry_type == :RAW_PACKET
      flags |= OPENC3_RAW_PACKET_ENTRY_TYPE_MASK
    else
      flags |= OPENC3_JSON_PACKET_ENTRY_TYPE_MASK
      key_map = @key_map_table[packet_index]
      if key_map
        # Compress data using key map
        data = JSON.parse(data, :allow_nan => true, :create_additions => true) if String === data
        compressed = {}
        data.each do |key, value|
          compressed_key = key_map[key]
          compressed_key = key unless compressed_key
          compressed[compressed_key] = value
        end
        if @data_format == :CBOR
          flags |= OPENC3_CBOR_FLAG_MASK
          data = compressed.to_cbor
        else
          data = JSON.generate(compressed, :allow_nan => true)
        end
      end
    end
    if cmd_or_tlm == :CMD
      flags |= OPENC3_CMD_FLAG_MASK
    end
    if received_time_nsec_since_epoch
      flags |= OPENC3_RECEIVED_TIME_FLAG_MASK
      length += OPENC3_RECEIVED_TIME_FIXED_SIZE
    end
    extra_encoded = nil
    if extra
      flags |= OPENC3_EXTRA_FLAG_MASK
      extra = JSON.parse(extra, :allow_nan => true, :create_additions => true) if String === extra
      length += OPENC3_EXTRA_LENGTH_FIXED_SIZE
      if @data_format == :CBOR
        extra_encoded = extra.as_json.to_cbor
      else
        extra_encoded = JSON.generate(extra.as_json, :allow_nan => true)
      end
      length += extra_encoded.length
    end
    length += OPENC3_PACKET_SECONDARY_FIXED_SIZE + data.length
    @entry.clear
    @entry << [length, flags, packet_index, time_nsec_since_epoch].pack(OPENC3_PACKET_PACK_DIRECTIVE)
    @entry << [received_time_nsec_since_epoch].pack(OPENC3_RECEIVED_TIME_PACK_DIRECTIVE) if received_time_nsec_since_epoch
    @entry << [extra_encoded.length].pack(OPENC3_EXTRA_LENGTH_PACK_DIRECTIVE) << extra_encoded if extra_encoded
    @entry << data.force_encoding('ASCII-8BIT')
    @first_time = time_nsec_since_epoch if !@first_time or time_nsec_since_epoch < @first_time
    @last_time = time_nsec_since_epoch if !@last_time or time_nsec_since_epoch > @last_time
  else
    raise "Unknown entry_type: #{entry_type}"
  end
  @file.write(@entry)
  @file_size += @entry.length
end