Class: BucketFileCache

Inherits:
Object show all
Defined in:
lib/openc3/utilities/bucket_file_cache.rb

Constant Summary collapse

MAX_DISK_USAGE =

Default 20 GB

(ENV['OPENC3_BUCKET_FILE_CACHE_SIZE'] || 20_000_000_000).to_i
CHECK_TIME_SECONDS =
3600
@@instance =
nil
@@mutex =
Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBucketFileCache

Returns a new instance of BucketFileCache.



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 166

def initialize
  # Create local file cache location
  @cache_dir = Dir.mktmpdir
  FileUtils.mkdir_p(@cache_dir)
  at_exit do
    FileUtils.remove_dir(@cache_dir, true)
  end

  @current_disk_usage = 0
  @queued_bucket_files = []
  @bucket_file_hash = {}
  bucket_file = nil

  @thread = Thread.new do
    client = OpenC3::Bucket.getClient()
    while true
      check_time = Time.now + CHECK_TIME_SECONDS # Check for aged out files periodically
      if @queued_bucket_files.length > 0 and @current_disk_usage < MAX_DISK_USAGE
        @@mutex.synchronize do
          bucket_file = @queued_bucket_files.shift
        end
        begin
          retrieved = bucket_file.retrieve(client)
          @@mutex.synchronize do
            @current_disk_usage += bucket_file.size if retrieved
          end
        rescue
          # Might have been deleted
        end
        sleep(0.01) # Small throttle
      else
        # Nothing to do or disk full
        if Time.now > check_time
          check_time = Time.now + CHECK_TIME_SECONDS
          # Delete any files that aren't reserved and are old
          removed_files = []
          @bucket_file_hash.each do |bucket_path, bucket_file|
            deleted = bucket_file.age_check
            if deleted
              removed_files << bucket_path
              @current_disk_usage -= bucket_file.size
            end
          end
          removed_files.each do |bucket_path|
            @bucket_file_hash.delete(bucket_path)
          end
        end
        sleep(1)
      end
    end
  rescue => err
    OpenC3::Logger.error "BucketFileCache thread unexpectedly died\n#{err.formatted}"
  end
end

Instance Attribute Details

#cache_dirObject (readonly)

Returns the value of attribute cache_dir.



153
154
155
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 153

def cache_dir
  @cache_dir
end

Class Method Details

.hint(bucket_paths) ⇒ Object



221
222
223
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 221

def self.hint(bucket_paths)
  return instance().hint(bucket_paths)
end

.instanceObject



158
159
160
161
162
163
164
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 158

def self.instance
  return @@instance if @@instance
  @@mutex.synchronize do
    @@instance ||= BucketFileCache.new
  end
  @@instance
end

.reserve(bucket_path) ⇒ Object



225
226
227
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 225

def self.reserve(bucket_path)
  return instance().reserve(bucket_path)
end

.unreserve(bucket_path) ⇒ Object



229
230
231
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 229

def self.unreserve(bucket_path)
  return instance().unreserve(bucket_path)
end

Instance Method Details

#create_bucket_file(bucket_path) ⇒ Object

Private



268
269
270
271
272
273
274
275
276
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 268

def create_bucket_file(bucket_path)
  bucket_file = @bucket_file_hash[bucket_path]
  unless bucket_file
    bucket_file = BucketFile.new(bucket_path)
    @queued_bucket_files << bucket_file
    @bucket_file_hash[bucket_path] = bucket_file
  end
  return bucket_file
end

#hint(bucket_paths) ⇒ Object



233
234
235
236
237
238
239
240
241
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 233

def hint(bucket_paths)
  @@mutex.synchronize do
    bucket_paths.each_with_index do |bucket_path, index|
      bucket_file = create_bucket_file(bucket_path)
      bucket_file.priority = index
    end
    @queued_bucket_files.sort! {|file1, file2| file1.priority <=> file2.priority}
  end
end

#reserve(bucket_path) ⇒ Object



243
244
245
246
247
248
249
250
251
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 243

def reserve(bucket_path)
  @@mutex.synchronize do
    bucket_file = create_bucket_file(bucket_path)
    retrieved = bucket_file.reserve
    @current_disk_usage += bucket_file.size if retrieved
    @queued_bucket_files.delete(bucket_file)
    return bucket_file
  end
end

#unreserve(bucket_path) ⇒ Object



253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 253

def unreserve(bucket_path)
  @@mutex.synchronize do
    bucket_file = @bucket_file_hash[bucket_path]
    if bucket_file
      bucket_file.unreserve
      if bucket_file.reservation_count <= 0 and !@queued_bucket_files.include?(bucket_file)
        @current_disk_usage -= bucket_file.size
        @bucket_file_hash.delete(bucket_file)
      end
    end
  end
end