Class: OpenC3::LogWriter

Inherits:
Object show all
Defined in:
lib/openc3/logs/log_writer.rb

Overview

Creates a log. Can automatically cycle the log based on an elasped time period or when the log file reaches a predefined size.

Direct Known Subclasses

PacketLogWriter, StreamLog, TextLogWriter

Constant Summary collapse

CYCLE_TIME_INTERVAL =

The cycle time interval. Cycle times are only checked at this level of granularity.

10
CLEANUP_DELAY =

Delay in seconds before trimming Redis streams

60
@@mutex =

Mutex protecting class variables

Mutex.new
@@instances =

Array of instances used to keep track of cycling logs

[]
@@cycle_thread =

Thread used to cycle logs across all log writers

nil
@@cycle_sleeper =

Sleeper used to delay cycle thread

nil

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(remote_log_directory, logging_enabled = true, cycle_time = nil, cycle_size = 1_000_000_000, cycle_hour = nil, cycle_minute = nil, enforce_time_order = true) ⇒ LogWriter

Returns a new instance of LogWriter.

Parameters:

  • remote_log_directory (String)

    The path to store the log files

  • logging_enabled (Boolean) (defaults to: true)

    Whether to start with logging enabled

  • cycle_time (Integer) (defaults to: nil)

    The amount of time in seconds before creating a new log file. This can be combined with cycle_size.

  • cycle_size (Integer) (defaults to: 1_000_000_000)

    The size in bytes before creating a new log file. This can be combined with cycle_time.

  • cycle_hour (Integer) (defaults to: nil)

    The time at which to cycle the log. Combined with cycle_minute to cycle the log daily at the specified time. If nil, the log will be cycled hourly at the specified cycle_minute.

  • cycle_minute (Integer) (defaults to: nil)

    The time at which to cycle the log. See cycle_hour for more information.



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/openc3/logs/log_writer.rb', line 97

def initialize(
  remote_log_directory,
  logging_enabled = true,
  cycle_time = nil,
  cycle_size = 1_000_000_000,
  cycle_hour = nil,
  cycle_minute = nil,
  enforce_time_order = true
)
  @remote_log_directory = remote_log_directory
  @logging_enabled = ConfigParser.handle_true_false(logging_enabled)
  @cycle_time = ConfigParser.handle_nil(cycle_time)
  if @cycle_time
    @cycle_time = Integer(@cycle_time)
    raise "cycle_time must be >= #{CYCLE_TIME_INTERVAL}" if @cycle_time < CYCLE_TIME_INTERVAL
  end
  @cycle_size = ConfigParser.handle_nil(cycle_size)
  @cycle_size = Integer(@cycle_size) if @cycle_size
  @cycle_hour = ConfigParser.handle_nil(cycle_hour)
  @cycle_hour = Integer(@cycle_hour) if @cycle_hour
  @cycle_minute = ConfigParser.handle_nil(cycle_minute)
  @cycle_minute = Integer(@cycle_minute) if @cycle_minute
  @enforce_time_order = ConfigParser.handle_true_false(enforce_time_order)
  @out_of_order = false
  @mutex = Mutex.new
  @file = nil
  @file_size = 0
  @filename = nil
  @start_time = Time.now.utc
  @first_time = nil
  @last_time = nil
  @cancel_threads = false
  @last_offsets = {}
  @cleanup_offsets = []
  @cleanup_times = []
  @previous_time_nsec_since_epoch = nil
  @tmp_dir = Dir.mktmpdir

  # This is an optimization to avoid creating a new entry object
  # each time we create an entry which we do a LOT!
  @entry = String.new

  # Always make sure there is a cycle thread - (because it does trimming)
  @@mutex.synchronize do
    @@instances << self

    unless @@cycle_thread
      @@cycle_thread = OpenC3.safe_thread("Log cycle") do
        cycle_thread_body()
      end
    end
  end
end

Instance Attribute Details

#cleanup_offsetsObject

Redis offsets for each topic to cleanup



62
63
64
# File 'lib/openc3/logs/log_writer.rb', line 62

def cleanup_offsets
  @cleanup_offsets
end

#cleanup_timesObject

Time at which to cleanup



65
66
67
# File 'lib/openc3/logs/log_writer.rb', line 65

def cleanup_times
  @cleanup_times
end

#cycle_hourObject (readonly)

Returns the value of attribute cycle_hour.



49
50
51
# File 'lib/openc3/logs/log_writer.rb', line 49

def cycle_hour
  @cycle_hour
end

#cycle_minuteObject (readonly)

Returns the value of attribute cycle_minute.



53
54
55
# File 'lib/openc3/logs/log_writer.rb', line 53

def cycle_minute
  @cycle_minute
end

#cycle_sizeObject (readonly)

Returns the value of attribute cycle_size.



44
45
46
# File 'lib/openc3/logs/log_writer.rb', line 44

def cycle_size
  @cycle_size
end

#cycle_timeObject (readonly)

Returns the value of attribute cycle_time.



40
41
42
# File 'lib/openc3/logs/log_writer.rb', line 40

def cycle_time
  @cycle_time
end

#filenameString (readonly)

Returns The filename of the packet log.

Returns:

  • (String)

    The filename of the packet log



33
34
35
# File 'lib/openc3/logs/log_writer.rb', line 33

def filename
  @filename
end

#logging_enabledtrue/false (readonly)

Returns Whether logging is enabled.

Returns:

  • (true/false)

    Whether logging is enabled



36
37
38
# File 'lib/openc3/logs/log_writer.rb', line 36

def logging_enabled
  @logging_enabled
end

#mutexMutex (readonly)

Returns Instance mutex protecting file.

Returns:

  • (Mutex)

    Instance mutex protecting file



59
60
61
# File 'lib/openc3/logs/log_writer.rb', line 59

def mutex
  @mutex
end

#start_timeTime (readonly)

Returns Time that the current log file started.

Returns:

  • (Time)

    Time that the current log file started



56
57
58
# File 'lib/openc3/logs/log_writer.rb', line 56

def start_time
  @start_time
end

Instance Method Details

#bucket_filenameObject



347
348
349
# File 'lib/openc3/logs/log_writer.rb', line 347

def bucket_filename
  "#{first_timestamp}__#{last_timestamp}" + extension
end

#close_file(take_mutex = true) ⇒ Object

Closing a log file isn’t critical so we just log an error. NOTE: This also trims the Redis stream to keep a full file’s worth of data in the stream. This is what prevents continuous stream growth. Returns thread that moves log to bucket



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/openc3/logs/log_writer.rb', line 312

def close_file(take_mutex = true)
  threads = []
  @mutex.lock if take_mutex
  begin
    if @file
      begin
        @file.close unless @file.closed?
        Logger.debug "Log File Closed : #{@filename}"
        date = first_timestamp[0..7] # YYYYMMDD
        bucket_key = File.join(@remote_log_directory, date, bucket_filename())
        # Cleanup timestamps here so they are unset for the next file
        @first_time = nil
        @last_time = nil
        threads << BucketUtilities.move_log_file_to_bucket(@filename, bucket_key)
        # Now that the file is in storage, trim the Redis stream after a delay
        @cleanup_offsets << {}
        @last_offsets.each do |redis_topic, last_offset|
          @cleanup_offsets[-1][redis_topic] = last_offset
        end
        @cleanup_times << Time.now + CLEANUP_DELAY
        @last_offsets.clear
      rescue Exception => err
        Logger.error "Error closing #{@filename} : #{err.formatted}"
      end

      @file = nil
      @file_size = 0
      @filename = nil
    end
  ensure
    @mutex.unlock if take_mutex
  end
  return threads
end

#create_unique_filename(ext = extension) ⇒ Object

implementation details



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/openc3/logs/log_writer.rb', line 185

def create_unique_filename(ext = extension)
  # Create a filename that doesn't exist
  attempt = nil
  while true
    filename_parts = [attempt]
    filename_parts.unshift @label if @label
    filename = File.join(@tmp_dir, File.build_timestamped_filename([@label, attempt], ext))
    if File.exist?(filename)
      attempt ||= 0
      attempt += 1
      Logger.warn("Unexpected file name conflict: #{filename}")
    else
      return filename
    end
  end
end

#cycle_thread_bodyObject



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/openc3/logs/log_writer.rb', line 202

def cycle_thread_body
  @@cycle_sleeper = Sleeper.new
  while true
    start_time = Time.now
    @@mutex.synchronize do
      @@instances.each do |instance|
        # The check against start_time needs to be mutex protected to prevent a packet coming in between the check
        # and closing the file
        instance.mutex.synchronize do
          utc_now = Time.now.utc
          # Logger.debug("start:#{@start_time.to_f} now:#{utc_now.to_f} cycle:#{@cycle_time} new:#{(utc_now - @start_time) > @cycle_time}")
          if instance.logging_enabled and instance.filename # Logging and file opened
            # Cycle based on total time logging
            if (instance.cycle_time and (utc_now - instance.start_time) > instance.cycle_time)
              Logger.debug("Log writer start new file due to cycle time")
              instance.close_file(false)
            # Cycle daily at a specific time
            elsif (instance.cycle_hour and instance.cycle_minute and utc_now.hour == instance.cycle_hour and utc_now.min == instance.cycle_minute and instance.start_time.yday != utc_now.yday)
              Logger.debug("Log writer start new file daily")
              instance.close_file(false)
            # Cycle hourly at a specific time
            elsif (instance.cycle_minute and not instance.cycle_hour and utc_now.min == instance.cycle_minute and instance.start_time.hour != utc_now.hour)
              Logger.debug("Log writer start new file hourly")
              instance.close_file(false)
            end
          end

          # Check for cleanup time
          indexes_to_clear = []
          instance.cleanup_times.each_with_index do |cleanup_time, index|
            if cleanup_time <= utc_now
              # Now that the file is in S3, trim the Redis stream up until the previous file.
              # This keeps one minute of data in Redis
              instance.cleanup_offsets[index].each do |redis_topic, cleanup_offset|
                Topic.trim_topic(redis_topic, cleanup_offset)
              end
              indexes_to_clear << index
            end
          end
          if indexes_to_clear.length > 0
            indexes_to_clear.each do |index|
              instance.cleanup_offsets[index] = nil
              instance.cleanup_times[index] = nil
            end
            instance.cleanup_offsets.compact!
            instance.cleanup_times.compact!
          end
        end
      end
    end

    # Only check whether to cycle at a set interval
    run_time = Time.now - start_time
    sleep_time = CYCLE_TIME_INTERVAL - run_time
    sleep_time = 0 if sleep_time < 0
    break if @@cycle_sleeper.sleep(sleep_time)
  end
end

#extensionObject



351
352
353
# File 'lib/openc3/logs/log_writer.rb', line 351

def extension
  '.log'.freeze
end

#first_timestampObject



355
356
357
# File 'lib/openc3/logs/log_writer.rb', line 355

def first_timestamp
  Time.from_nsec_from_epoch(@first_time).to_timestamp # "YYYYMMDDHHmmSSNNNNNNNNN"
end

#graceful_killObject



179
180
181
# File 'lib/openc3/logs/log_writer.rb', line 179

def graceful_kill
  @cancel_threads = true
end

#last_timestampObject



359
360
361
# File 'lib/openc3/logs/log_writer.rb', line 359

def last_timestamp
  Time.from_nsec_from_epoch(@last_time).to_timestamp # "YYYYMMDDHHmmSSNNNNNNNNN"
end

#prepare_write(time_nsec_since_epoch, data_length, redis_topic = nil, redis_offset = nil, allow_new_file: true, process_out_of_order: true) ⇒ Object

process_out_of_order ignores the timestamps for the current entry (used to ignore timestamps on metadata entries, vs actual packets)



286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/openc3/logs/log_writer.rb', line 286

def prepare_write(time_nsec_since_epoch, data_length, redis_topic = nil, redis_offset = nil, allow_new_file: true, process_out_of_order: true)
  # This check includes logging_enabled again because it might have changed since we acquired the mutex
  # Ensures new files based on size, and ensures always increasing time order in files
  if @logging_enabled
    if !@file
      Logger.debug("Log writer start new file because no file opened")
      start_new_file() if allow_new_file
    elsif @cycle_size and ((@file_size + data_length) > @cycle_size)
      Logger.debug("Log writer start new file due to cycle size #{@cycle_size}")
      start_new_file() if allow_new_file
    elsif process_out_of_order and @enforce_time_order and @previous_time_nsec_since_epoch and (@previous_time_nsec_since_epoch > time_nsec_since_epoch)
      # Warning: Creating new files here can cause lots of files to be created if packets make it through out of order
      # Changed to just a error to prevent file thrashing
      unless @out_of_order
        Logger.error("Log writer out of order time detected (increase buffer depth?): #{Time.from_nsec_from_epoch(@previous_time_nsec_since_epoch)} #{Time.from_nsec_from_epoch(time_nsec_since_epoch)}")
        @out_of_order = true
      end
    end
  end
  @last_offsets[redis_topic] = redis_offset if redis_topic and redis_offset # This is needed for the redis offset marker entry at the end of the log file
  @previous_time_nsec_since_epoch = time_nsec_since_epoch if process_out_of_order
end

#shutdownObject

Stop all logging, close the current log file, and kill the logging threads.



166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/openc3/logs/log_writer.rb', line 166

def shutdown
  threads = stop()
  @@mutex.synchronize do
    @@instances.delete(self)
    if @@instances.length <= 0
      @@cycle_sleeper.cancel if @@cycle_sleeper
      OpenC3.kill_thread(self, @@cycle_thread) if @@cycle_thread
      @@cycle_thread = nil
    end
  end
  return threads
end

#startObject

Starts a new log file by closing the existing log file. New log files are not created until packets are written by #write so this does not immediately create a log file on the filesystem.



154
155
156
# File 'lib/openc3/logs/log_writer.rb', line 154

def start
  @mutex.synchronize { close_file(false); @logging_enabled = true }
end

#start_new_fileObject

Starting a new log file is a critical operation so the entire method is wrapped with a rescue and handled with handle_critical_exception Assumes mutex has already been taken



264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/openc3/logs/log_writer.rb', line 264

def start_new_file
  close_file(false) if @file

  # Start log file
  @filename = create_unique_filename()
  @file = File.new(@filename, 'wb')
  @file_size = 0

  @start_time = Time.now.utc
  @out_of_order = false
  @first_time = nil
  @last_time = nil
  @previous_time_nsec_since_epoch = nil
  Logger.debug "Log File Opened : #{@filename}"
rescue => err
  Logger.error "Error starting new log file: #{err.formatted}"
  @logging_enabled = false
  OpenC3.handle_critical_exception(err)
end

#stopObject

Stops all logging and closes the current log file.



159
160
161
162
163
# File 'lib/openc3/logs/log_writer.rb', line 159

def stop
  threads = nil
  @mutex.synchronize { threads = close_file(false); @logging_enabled = false; }
  return threads
end