Class: BucketFileCache

Inherits:
Object show all
Defined in:
lib/openc3/utilities/bucket_file_cache.rb

Constant Summary collapse

MAX_DISK_USAGE =

Default 20 GB

(ENV['OPENC3_BUCKET_FILE_CACHE_SIZE'] || 20_000_000_000).to_i
CHECK_TIME_SECONDS =
3600
@@instance =
nil
@@mutex =
Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBucketFileCache

Returns a new instance of BucketFileCache.



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 156

def initialize
  # Create local file cache location
  @cache_dir = Dir.mktmpdir
  FileUtils.mkdir_p(@cache_dir)
  at_exit do
    FileUtils.remove_dir(@cache_dir, true)
  end

  @current_disk_usage = 0
  @queued_bucket_files = []
  @bucket_file_hash = {}
  bucket_file = nil

  @thread = Thread.new do
    client = OpenC3::Bucket.getClient()
    while true
      check_time = Time.now + CHECK_TIME_SECONDS # Check for aged out files periodically
      if @queued_bucket_files.length > 0 and @current_disk_usage < MAX_DISK_USAGE
        @@mutex.synchronize do
          bucket_file = @queued_bucket_files.shift
        end
        begin
          retrieved = bucket_file.retrieve(client)
          @@mutex.synchronize do
            @current_disk_usage += bucket_file.size if retrieved
          end
        rescue
          # Might have been deleted
        end
        sleep(0.01) # Small throttle
      else
        # Nothing to do or disk full
        if Time.now > check_time
          check_time = Time.now + CHECK_TIME_SECONDS
          # Delete any files that aren't reserved and are old
          removed_files = []
          @bucket_file_hash.each do |bucket_path, bucket_file|
            deleted = bucket_file.age_check
            if deleted
              removed_files << bucket_path
              @current_disk_usage -= bucket_file.size
            end
          end
          removed_files.each do |bucket_path|
            @bucket_file_hash.delete(bucket_path)
          end
        end
        sleep(1)
      end
    end
  rescue => err
    OpenC3::Logger.error "BucketFileCache thread unexpectedly died\n#{err.formatted}"
  end
end

Instance Attribute Details

#cache_dirObject (readonly)

Returns the value of attribute cache_dir.



143
144
145
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 143

def cache_dir
  @cache_dir
end

Class Method Details

.hint(bucket_paths) ⇒ Object



211
212
213
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 211

def self.hint(bucket_paths)
  return instance().hint(bucket_paths)
end

.instanceObject



148
149
150
151
152
153
154
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 148

def self.instance
  return @@instance if @@instance
  @@mutex.synchronize do
    @@instance ||= BucketFileCache.new
  end
  @@instance
end

.reserve(bucket_path) ⇒ Object



215
216
217
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 215

def self.reserve(bucket_path)
  return instance().reserve(bucket_path)
end

.unreserve(bucket_path) ⇒ Object



219
220
221
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 219

def self.unreserve(bucket_path)
  return instance().unreserve(bucket_path)
end

Instance Method Details

#create_bucket_file(bucket_path) ⇒ Object

Private



258
259
260
261
262
263
264
265
266
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 258

def create_bucket_file(bucket_path)
  bucket_file = @bucket_file_hash[bucket_path]
  unless bucket_file
    bucket_file = BucketFile.new(bucket_path)
    @queued_bucket_files << bucket_file
    @bucket_file_hash[bucket_path] = bucket_file
  end
  return bucket_file
end

#hint(bucket_paths) ⇒ Object



223
224
225
226
227
228
229
230
231
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 223

def hint(bucket_paths)
  @@mutex.synchronize do
    bucket_paths.each_with_index do |bucket_path, index|
      bucket_file = create_bucket_file(bucket_path)
      bucket_file.priority = index
    end
    @queued_bucket_files.sort! {|file1, file2| file1.priority <=> file2.priority}
  end
end

#reserve(bucket_path) ⇒ Object



233
234
235
236
237
238
239
240
241
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 233

def reserve(bucket_path)
  @@mutex.synchronize do
    bucket_file = create_bucket_file(bucket_path)
    retrieved = bucket_file.reserve
    @current_disk_usage += bucket_file.size if retrieved
    @queued_bucket_files.delete(bucket_file)
    return bucket_file
  end
end

#unreserve(bucket_path) ⇒ Object



243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 243

def unreserve(bucket_path)
  @@mutex.synchronize do
    bucket_file = @bucket_file_hash[bucket_path]
    if bucket_file
      bucket_file.unreserve
      if bucket_file.reservation_count <= 0 and !@queued_bucket_files.include?(bucket_file)
        @current_disk_usage -= bucket_file.size
        @bucket_file_hash.delete(bucket_file)
      end
    end
  end
end