Class: OpenC3::LogWriter

Inherits:
Object show all
Defined in:
lib/openc3/logs/log_writer.rb

Overview

Creates a log. Can automatically cycle the log based on an elapsed time period or when the log file reaches a predefined size.

Direct Known Subclasses

PacketLogWriter, StreamLog, TextLogWriter

Constant Summary collapse

CYCLE_TIME_INTERVAL =

The cycle time interval. Cycle times are only checked at this level of granularity.

10
CLEANUP_DELAY =

Delay in seconds before trimming Redis streams

60
@@mutex =

Mutex protecting class variables

Mutex.new
@@instances =

Array of instances used to keep track of cycling logs

[]
@@cycle_thread =

Thread used to cycle logs across all log writers

nil
@@cycle_sleeper =

Sleeper used to delay cycle thread

nil

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(remote_log_directory, logging_enabled = true, cycle_time = nil, cycle_size = 1_000_000_000, cycle_hour = nil, cycle_minute = nil, enforce_time_order = true, cycle_thread: true) ⇒ LogWriter

Returns a new instance of LogWriter.

Parameters:

  • remote_log_directory (String)

    The path to store the log files

  • logging_enabled (Boolean) (defaults to: true)

    Whether to start with logging enabled

  • cycle_time (Integer) (defaults to: nil)

    The amount of time in seconds before creating a new log file. This can be combined with cycle_size.

  • cycle_size (Integer) (defaults to: 1_000_000_000)

    The size in bytes before creating a new log file. This can be combined with cycle_time.

  • cycle_hour (Integer) (defaults to: nil)

    The time at which to cycle the log. Combined with cycle_minute to cycle the log daily at the specified time. If nil, the log will be cycled hourly at the specified cycle_minute.

  • cycle_minute (Integer) (defaults to: nil)

    The time at which to cycle the log. See cycle_hour for more information.



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/openc3/logs/log_writer.rb', line 91

def initialize(
  remote_log_directory,
  logging_enabled = true,
  cycle_time = nil,
  cycle_size = 1_000_000_000,
  cycle_hour = nil,
  cycle_minute = nil,
  enforce_time_order = true,
  cycle_thread: true
)
  @remote_log_directory = remote_log_directory
  @logging_enabled = ConfigParser.handle_true_false(logging_enabled)
  @cycle_time = ConfigParser.handle_nil(cycle_time)
  if @cycle_time
    @cycle_time = Integer(@cycle_time)
    raise "cycle_time must be >= #{CYCLE_TIME_INTERVAL}" if @cycle_time < CYCLE_TIME_INTERVAL
  end
  @cycle_size = ConfigParser.handle_nil(cycle_size)
  @cycle_size = Integer(@cycle_size) if @cycle_size
  @cycle_hour = ConfigParser.handle_nil(cycle_hour)
  @cycle_hour = Integer(@cycle_hour) if @cycle_hour
  @cycle_minute = ConfigParser.handle_nil(cycle_minute)
  @cycle_minute = Integer(@cycle_minute) if @cycle_minute
  @enforce_time_order = ConfigParser.handle_true_false(enforce_time_order)
  @out_of_order = false
  @mutex = Mutex.new
  @file = nil
  @file_size = 0
  @filename = nil
  @start_time = Time.now.utc
  @first_time = nil
  @last_time = nil
  @cancel_threads = false
  @last_offsets = {}
  @cleanup_offsets = []
  @cleanup_times = []
  @previous_time_nsec_since_epoch = nil
  @tmp_dir = Dir.mktmpdir
  @wait_threads = []

  # This is an optimization to avoid creating a new entry object
  # each time we create an entry which we do a LOT!
  @entry = String.new

  if cycle_thread
    # Always make sure there is a cycle thread - (because it does trimming)
    @@mutex.synchronize do
      @@instances << self

      unless @@cycle_thread
        @@cycle_thread = OpenC3.safe_thread("Log cycle") do
          cycle_thread_body()
        end
      end
    end
  end
end

Instance Attribute Details

#cleanup_offsetsObject

Redis offsets for each topic to cleanup



56
57
58
# File 'lib/openc3/logs/log_writer.rb', line 56

def cleanup_offsets
  @cleanup_offsets
end

#cleanup_timesObject

Time at which to cleanup



59
60
61
# File 'lib/openc3/logs/log_writer.rb', line 59

def cleanup_times
  @cleanup_times
end

#cycle_hourObject (readonly)

Returns the value of attribute cycle_hour.



43
44
45
# File 'lib/openc3/logs/log_writer.rb', line 43

def cycle_hour
  @cycle_hour
end

#cycle_minuteObject (readonly)

Returns the value of attribute cycle_minute.



47
48
49
# File 'lib/openc3/logs/log_writer.rb', line 47

def cycle_minute
  @cycle_minute
end

#cycle_sizeObject (readonly)

Returns the value of attribute cycle_size.



38
39
40
# File 'lib/openc3/logs/log_writer.rb', line 38

def cycle_size
  @cycle_size
end

#cycle_timeObject (readonly)

Returns the value of attribute cycle_time.



34
35
36
# File 'lib/openc3/logs/log_writer.rb', line 34

def cycle_time
  @cycle_time
end

#filenameString (readonly)

Returns The filename of the packet log.

Returns:

  • (String)

    The filename of the packet log



27
28
29
# File 'lib/openc3/logs/log_writer.rb', line 27

def filename
  @filename
end

#logging_enabledtrue/false (readonly)

Returns Whether logging is enabled.

Returns:

  • (true/false)

    Whether logging is enabled



30
31
32
# File 'lib/openc3/logs/log_writer.rb', line 30

def logging_enabled
  @logging_enabled
end

#mutexMutex (readonly)

Returns Instance mutex protecting file.

Returns:

  • (Mutex)

    Instance mutex protecting file



53
54
55
# File 'lib/openc3/logs/log_writer.rb', line 53

def mutex
  @mutex
end

#start_timeTime (readonly)

Returns Time that the current log file started.

Returns:

  • (Time)

    Time that the current log file started



50
51
52
# File 'lib/openc3/logs/log_writer.rb', line 50

def start_time
  @start_time
end

Instance Method Details

#bucket_filenameObject



366
367
368
# File 'lib/openc3/logs/log_writer.rb', line 366

def bucket_filename
  "#{first_timestamp}__#{last_timestamp}" + extension
end

#cleanupObject



176
177
178
179
180
181
# File 'lib/openc3/logs/log_writer.rb', line 176

def cleanup
  if @tmp_dir
    FileUtils.remove_entry_secure(@tmp_dir, true)
    @tmp_dir = nil
  end
end

#close_file(take_mutex = true) ⇒ Object

Closing a log file isn’t critical so we just log an error. NOTE: This also trims the Redis stream to keep a full file’s worth of data in the stream. This is what prevents continuous stream growth. Returns thread that moves log to bucket



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/openc3/logs/log_writer.rb', line 316

def close_file(take_mutex = true)
  @mutex.lock if take_mutex

  # Remove old wait_threads
  to_remove = []
  @wait_threads.each do |thread|
    unless thread.alive?
      to_remove << thread
    end
  end
  to_remove.each do |thread|
    @wait_threads.delete(thread)
  end

  begin
    if @file
      begin
        @file.close unless @file.closed?
        Logger.debug "Log File Closed : #{@filename}"
        # Only try to moce the file if we've written data to it
        # This is indicated by the first and last timestamps being set
        if @first_time and @last_time
          date = first_timestamp[0..7] # YYYYMMDD
          bucket_key = File.join(@remote_log_directory, date, bucket_filename())
          # Cleanup timestamps here so they are unset for the next file
          @first_time = nil
          @last_time = nil
          @wait_threads << BucketUtilities.move_log_file_to_bucket(@filename, bucket_key)
          # Now that the file is in storage, trim the Redis stream after a delay
          @cleanup_offsets << {}
          @last_offsets.each do |redis_topic, last_offset|
            @cleanup_offsets[-1][redis_topic] = last_offset
          end
          @cleanup_times << (Time.now + CLEANUP_DELAY)
          @last_offsets.clear
        end
      rescue Exception => e
        Logger.error "Error closing #{@filename} : #{e.formatted}"
      end

      @file = nil
      @file_size = 0
      @filename = nil
    end
  ensure
    @mutex.unlock if take_mutex
  end
  return @wait_threads
end

#create_unique_filename(ext = extension) ⇒ Object

implementation details



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/openc3/logs/log_writer.rb', line 189

def create_unique_filename(ext = extension)
  # Create a filename that doesn't exist
  attempt = nil
  while true
    filename_parts = [attempt]
    filename_parts.unshift @label if @label
    filename = File.join(@tmp_dir, File.build_timestamped_filename([@label, attempt], ext))
    if File.exist?(filename)
      attempt ||= 0
      attempt += 1
      Logger.warn("Unexpected file name conflict: #{filename}")
    else
      return filename
    end
  end
end

#cycle_thread_bodyObject



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/openc3/logs/log_writer.rb', line 206

def cycle_thread_body
  @@cycle_sleeper = Sleeper.new
  while true
    start_time = Time.now
    @@mutex.synchronize do
      @@instances.each do |instance|
        # The check against start_time needs to be mutex protected to prevent a packet coming in between the check
        # and closing the file
        instance.mutex.synchronize do
          utc_now = Time.now.utc
          # Logger.debug("start:#{@start_time.to_f} now:#{utc_now.to_f} cycle:#{@cycle_time} new:#{(utc_now - @start_time) > @cycle_time}")
          if instance.logging_enabled and instance.filename # Logging and file opened
            # Cycle based on total time logging
            if (instance.cycle_time and (utc_now - instance.start_time) > instance.cycle_time)
              Logger.debug("Log writer start new file due to cycle time")
              instance.close_file(false)
            # Cycle daily at a specific time
            elsif (instance.cycle_hour and instance.cycle_minute and utc_now.hour == instance.cycle_hour and utc_now.min == instance.cycle_minute and instance.start_time.yday != utc_now.yday)
              Logger.debug("Log writer start new file daily")
              instance.close_file(false)
            # Cycle hourly at a specific time
            elsif (instance.cycle_minute and not instance.cycle_hour and utc_now.min == instance.cycle_minute and instance.start_time.hour != utc_now.hour)
              Logger.debug("Log writer start new file hourly")
              instance.close_file(false)
            end
          end

          # Check for cleanup time
          indexes_to_clear = []
          instance.cleanup_times.each_with_index do |cleanup_time, index|
            if cleanup_time <= utc_now
              # Now that the file is in S3, trim the Redis stream up until the previous file.
              # This keeps one minute of data in Redis
              instance.cleanup_offsets[index].each do |redis_topic, cleanup_offset|
                Topic.trim_topic(redis_topic, cleanup_offset)
              end
              indexes_to_clear << index
            end
          end
          if indexes_to_clear.length > 0
            indexes_to_clear.each do |index|
              instance.cleanup_offsets[index] = nil
              instance.cleanup_times[index] = nil
            end
            instance.cleanup_offsets.compact!
            instance.cleanup_times.compact!
          end
        end
      end
    end

    # Only check whether to cycle at a set interval
    run_time = Time.now - start_time
    sleep_time = CYCLE_TIME_INTERVAL - run_time
    sleep_time = 0 if sleep_time < 0
    break if @@cycle_sleeper.sleep(sleep_time)
  end
end

#extensionObject



370
371
372
# File 'lib/openc3/logs/log_writer.rb', line 370

def extension
  '.log'.freeze
end

#first_timestampObject



374
375
376
# File 'lib/openc3/logs/log_writer.rb', line 374

def first_timestamp
  Time.from_nsec_from_epoch(@first_time).to_timestamp # "YYYYMMDDHHmmSSNNNNNNNNN"
end

#graceful_killObject



183
184
185
# File 'lib/openc3/logs/log_writer.rb', line 183

def graceful_kill
  @cancel_threads = true
end

#last_timestampObject



378
379
380
# File 'lib/openc3/logs/log_writer.rb', line 378

def last_timestamp
  Time.from_nsec_from_epoch(@last_time).to_timestamp # "YYYYMMDDHHmmSSNNNNNNNNN"
end

#prepare_write(time_nsec_since_epoch, data_length, redis_topic = nil, redis_offset = nil, allow_new_file: true, process_out_of_order: true) ⇒ Object

process_out_of_order ignores the timestamps for the current entry (used to ignore timestamps on metadata entries, vs actual packets)



290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/openc3/logs/log_writer.rb', line 290

def prepare_write(time_nsec_since_epoch, data_length, redis_topic = nil, redis_offset = nil, allow_new_file: true, process_out_of_order: true)
  # This check includes logging_enabled again because it might have changed since we acquired the mutex
  # Ensures new files based on size, and ensures always increasing time order in files
  if @logging_enabled
    if !@file
      Logger.debug("Log writer start new file because no file opened")
      start_new_file() if allow_new_file
    elsif @cycle_size and ((@file_size + data_length) > @cycle_size)
      Logger.debug("Log writer start new file due to cycle size #{@cycle_size}")
      start_new_file() if allow_new_file
    elsif process_out_of_order and @enforce_time_order and @previous_time_nsec_since_epoch and (@previous_time_nsec_since_epoch > time_nsec_since_epoch)
      # Warning: Creating new files here can cause lots of files to be created if packets make it through out of order
      # Changed to just a error to prevent file thrashing
      unless @out_of_order
        Logger.error("Log writer out of order time detected (increase buffer depth?): #{Time.from_nsec_from_epoch(@previous_time_nsec_since_epoch)} #{Time.from_nsec_from_epoch(time_nsec_since_epoch)}")
        @out_of_order = true
      end
    end
  end
  @last_offsets[redis_topic] = redis_offset if redis_topic and redis_offset # This is needed for the redis offset marker entry at the end of the log file
  @previous_time_nsec_since_epoch = time_nsec_since_epoch if process_out_of_order
end

#shutdownObject

Stop all logging, close the current log file, and kill the logging threads.



163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/openc3/logs/log_writer.rb', line 163

def shutdown
  threads = stop()
  @@mutex.synchronize do
    @@instances.delete(self)
    if @@instances.length <= 0
      @@cycle_sleeper.cancel if @@cycle_sleeper
      OpenC3.kill_thread(self, @@cycle_thread) if @@cycle_thread
      @@cycle_thread = nil
    end
  end
  return threads
end

#startObject

Starts a new log file by closing the existing log file. New log files are not created until packets are written by #write so this does not immediately create a log file on the filesystem.



152
153
154
# File 'lib/openc3/logs/log_writer.rb', line 152

def start
  @mutex.synchronize { close_file(false); @logging_enabled = true }
end

#start_new_fileObject

Starting a new log file is a critical operation so the entire method is wrapped with a rescue and handled with handle_critical_exception Assumes mutex has already been taken



268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/openc3/logs/log_writer.rb', line 268

def start_new_file
  close_file(false) if @file

  # Start log file
  @filename = create_unique_filename()
  @file = File.new(@filename, 'wb')
  @file_size = 0

  @start_time = Time.now.utc
  @out_of_order = false
  @first_time = nil
  @last_time = nil
  @previous_time_nsec_since_epoch = nil
  Logger.debug "Log File Opened : #{@filename}"
rescue => e
  Logger.error "Error starting new log file: #{e.formatted}"
  @logging_enabled = false
  OpenC3.handle_critical_exception(e)
end

#stopObject

Stops all logging and closes the current log file.



157
158
159
160
# File 'lib/openc3/logs/log_writer.rb', line 157

def stop
  @mutex.synchronize { close_file(false); @logging_enabled = false; }
  return @wait_threads
end