Class: BucketFile
Constant Summary collapse
- MAX_AGE_SECONDS =
4 hours
3600 * 4
Instance Attribute Summary collapse
-
#bucket_path ⇒ Object
readonly
Returns the value of attribute bucket_path.
-
#error ⇒ Object
readonly
Returns the value of attribute error.
-
#init_time ⇒ Object
readonly
Returns the value of attribute init_time.
-
#local_path ⇒ Object
readonly
Returns the value of attribute local_path.
-
#priority ⇒ Object
Returns the value of attribute priority.
-
#reservation_count ⇒ Object
readonly
Returns the value of attribute reservation_count.
-
#size ⇒ Object
readonly
Returns the value of attribute size.
-
#topic_prefix ⇒ Object
readonly
Returns the value of attribute topic_prefix.
Instance Method Summary collapse
- #age_check ⇒ Object
-
#delete ⇒ Object
private.
-
#initialize(bucket_path) ⇒ BucketFile
constructor
A new instance of BucketFile.
- #reserve ⇒ Object
-
#retrieve(client = @bucket, uncompress = true) ⇒ Object
Returns true if the file was retrieved and added to the disk Returns false if the file already exists Raises an error on retrieval errors.
- #unreserve ⇒ Object
Constructor Details
#initialize(bucket_path) ⇒ BucketFile
Returns a new instance of BucketFile.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 36 def initialize(bucket_path) @bucket = OpenC3::Bucket.getClient() @bucket_path = bucket_path @local_path = nil @reservation_count = 0 @size = 0 @priority = 0 @error = nil @init_time = Time.now @mutex = Mutex.new path_split = @bucket_path.split("/") scope = path_split[0].to_s.upcase stream_mode = path_split[1].to_s.split("_")[0].to_s.upcase cmd_or_tlm = path_split[2].to_s.upcase target_name = path_split[3].to_s.upcase if stream_mode == 'RAW' type = (cmd_or_tlm == 'CMD') ? 'COMMAND' : 'TELEMETRY' else if stream_mode == 'DECOM' type = (cmd_or_tlm == 'CMD') ? 'DECOMCMD' : 'DECOM' end end @topic_prefix = "#{scope}__#{type}__{#{target_name}}" end |
Instance Attribute Details
#bucket_path ⇒ Object (readonly)
Returns the value of attribute bucket_path.
27 28 29 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 27 def bucket_path @bucket_path end |
#error ⇒ Object (readonly)
Returns the value of attribute error.
31 32 33 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 31 def error @error end |
#init_time ⇒ Object (readonly)
Returns the value of attribute init_time.
33 34 35 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 33 def init_time @init_time end |
#local_path ⇒ Object (readonly)
Returns the value of attribute local_path.
28 29 30 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 28 def local_path @local_path end |
#priority ⇒ Object
Returns the value of attribute priority.
34 35 36 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 34 def priority @priority end |
#reservation_count ⇒ Object (readonly)
Returns the value of attribute reservation_count.
29 30 31 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 29 def reservation_count @reservation_count end |
#size ⇒ Object (readonly)
Returns the value of attribute size.
30 31 32 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 30 def size @size end |
#topic_prefix ⇒ Object (readonly)
Returns the value of attribute topic_prefix.
32 33 34 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 32 def topic_prefix @topic_prefix end |
Instance Method Details
#age_check ⇒ Object
118 119 120 121 122 123 124 125 126 127 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 118 def age_check @mutex.synchronize do if (Time.now - @init_time) > MAX_AGE_SECONDS and @reservation_count <= 0 delete() return true else return false end end end |
#delete ⇒ Object
private
131 132 133 134 135 136 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 131 def delete if @local_path and File.exist?(@local_path) File.delete(@local_path) @local_path = nil end end |
#reserve ⇒ Object
105 106 107 108 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 105 def reserve @mutex.synchronize { @reservation_count += 1 } return retrieve() end |
#retrieve(client = @bucket, uncompress = true) ⇒ Object
Returns true if the file was retrieved and added to the disk Returns false if the file already exists Raises an error on retrieval errors
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 64 def retrieve(client = @bucket, uncompress = true) @mutex.synchronize do local_path = "#{BucketFileCache.instance.cache_dir}/#{File.basename(@bucket_path)}" unless File.exist?(local_path) OpenC3::Logger.debug "Retrieving #{@bucket_path} from logs bucket" retry_count = 0 begin client_result = client.get_object(bucket: ENV['OPENC3_LOGS_BUCKET'], key: @bucket_path, path: local_path) unless File.exist?(local_path) raise "Local file does not exist after get_object: #{client_result.inspect}" end rescue => err # Try to retrieve the file three times retry_count += 1 raise err if retry_count >= 3 OpenC3::Logger.warn("Error retrieving log file from bucket - retry #{retry_count}: #{@bucket_path}\n#{err.formatted}") sleep(1) retry end if File.exist?(local_path) basename = File.basename(local_path) if uncompress and File.extname(basename) == ".gz" uncompressed = OpenC3::BucketUtilities.uncompress_file(local_path) File.delete(local_path) local_path = uncompressed end @size = File.size(local_path) @local_path = local_path return true end end return false end rescue => err @error = err OpenC3::Logger.error "Failed to retrieve #{@bucket_path}\n#{err.formatted}" raise err end |
#unreserve ⇒ Object
110 111 112 113 114 115 116 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 110 def unreserve @mutex.synchronize do @reservation_count -= 1 delete() if @reservation_count <= 0 return @reservation_count end end |