Class: BucketFile

Inherits:
Object show all
Defined in:
lib/openc3/utilities/bucket_file_cache.rb

Constant Summary collapse

MAX_AGE_SECONDS =

4 hours

3600 * 4

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bucket_path) ⇒ BucketFile

Returns a new instance of BucketFile.



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 41

def initialize(bucket_path)
  @bucket = OpenC3::Bucket.getClient()
  @bucket_path = bucket_path
  @local_path = nil
  @reservation_count = 0
  @size = 0
  @priority = 0
  @error = nil
  @init_time = Time.now
  @mutex = Mutex.new
  path_split = @bucket_path.split("/")
  scope = path_split[0].to_s.upcase
  stream_mode = path_split[1].to_s.split("_")[0].to_s.upcase
  if stream_mode == 'REDUCED'
    stream_mode << '_' << path_split[1].to_s.split("_")[1].to_s.upcase
  end
  cmd_or_tlm = path_split[2].to_s.upcase
  target_name = path_split[3].to_s.upcase
  if stream_mode == 'RAW'
    type = (cmd_or_tlm == 'CMD') ? 'COMMAND' : 'TELEMETRY'
  else
    if stream_mode == 'DECOM'
      type = (cmd_or_tlm == 'CMD') ? 'DECOMCMD' : 'DECOM'
    else
      type = stream_mode # REDUCED_MINUTE, REDUCED_HOUR, or REDUCED_DAY
    end
  end
  @topic_prefix = "#{scope}__#{type}__{#{target_name}}"
end

Instance Attribute Details

#bucket_pathObject (readonly)

Returns the value of attribute bucket_path.



32
33
34
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 32

def bucket_path
  @bucket_path
end

#errorObject (readonly)

Returns the value of attribute error.



36
37
38
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 36

def error
  @error
end

#init_timeObject (readonly)

Returns the value of attribute init_time.



38
39
40
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 38

def init_time
  @init_time
end

#local_pathObject (readonly)

Returns the value of attribute local_path.



33
34
35
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 33

def local_path
  @local_path
end

#priorityObject

Returns the value of attribute priority.



39
40
41
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 39

def priority
  @priority
end

#reservation_countObject (readonly)

Returns the value of attribute reservation_count.



34
35
36
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 34

def reservation_count
  @reservation_count
end

#sizeObject (readonly)

Returns the value of attribute size.



35
36
37
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 35

def size
  @size
end

#topic_prefixObject (readonly)

Returns the value of attribute topic_prefix.



37
38
39
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 37

def topic_prefix
  @topic_prefix
end

Instance Method Details

#age_checkObject



128
129
130
131
132
133
134
135
136
137
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 128

def age_check
  @mutex.synchronize do
    if (Time.now - @init_time) > MAX_AGE_SECONDS and @reservation_count <= 0
      delete()
      return true
    else
      return false
    end
  end
end

#deleteObject

private



141
142
143
144
145
146
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 141

def delete
  if @local_path and File.exist?(@local_path)
    File.delete(@local_path)
    @local_path = nil
  end
end

#reserveObject



115
116
117
118
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 115

def reserve
  @mutex.synchronize { @reservation_count += 1 }
  return retrieve()
end

#retrieve(client = @bucket, uncompress = true) ⇒ Object

Returns true if the file was retrieved and added to the disk Returns false if the file already exists Raises an error on retrieval errors



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 74

def retrieve(client = @bucket, uncompress = true)
  @mutex.synchronize do
    local_path = "#{BucketFileCache.instance.cache_dir}/#{File.basename(@bucket_path)}"
    unless File.exist?(local_path)
      OpenC3::Logger.debug "Retrieving #{@bucket_path} from logs bucket"

      retry_count = 0
      begin
        client_result = client.get_object(bucket: ENV['OPENC3_LOGS_BUCKET'], key: @bucket_path, path: local_path)
        unless File.exist?(local_path)
          raise "Local file does not exist after get_object: #{client_result.inspect}"
        end
      rescue => err
        # Try to retrieve the file three times
        retry_count += 1
        raise err if retry_count >= 3
        OpenC3::Logger.warn("Error retrieving log file from bucket - retry #{retry_count}: #{@bucket_path}\n#{err.formatted}")
        sleep(1)
        retry
      end

      if File.exist?(local_path)
        basename = File.basename(local_path)
        if uncompress and File.extname(basename) == ".gz"
          uncompressed = OpenC3::BucketUtilities.uncompress_file(local_path)
          File.delete(local_path)
          local_path = uncompressed
        end
        @size = File.size(local_path)
        @local_path = local_path
        return true
      end
    end
    return false
  end
rescue => err
  @error = err
  OpenC3::Logger.error "Failed to retrieve #{@bucket_path}\n#{err.formatted}"
  raise err
end

#unreserveObject



120
121
122
123
124
125
126
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 120

def unreserve
  @mutex.synchronize do
    @reservation_count -= 1
    delete() if @reservation_count <= 0
    return @reservation_count
  end
end