Class: BucketFile
Constant Summary collapse
- MAX_AGE_SECONDS =
4 hours
3600 * 4
Instance Attribute Summary collapse
-
#bucket_path ⇒ Object
readonly
Returns the value of attribute bucket_path.
-
#error ⇒ Object
readonly
Returns the value of attribute error.
-
#init_time ⇒ Object
readonly
Returns the value of attribute init_time.
-
#local_path ⇒ Object
readonly
Returns the value of attribute local_path.
-
#priority ⇒ Object
Returns the value of attribute priority.
-
#reservation_count ⇒ Object
readonly
Returns the value of attribute reservation_count.
-
#size ⇒ Object
readonly
Returns the value of attribute size.
-
#topic_prefix ⇒ Object
readonly
Returns the value of attribute topic_prefix.
Instance Method Summary collapse
- #age_check ⇒ Object
-
#delete ⇒ Object
private.
-
#initialize(bucket_path) ⇒ BucketFile
constructor
A new instance of BucketFile.
- #reserve ⇒ Object
-
#retrieve(client = @bucket, uncompress = true) ⇒ Object
Returns true if the file was retrieved and added to the disk Returns false if the file already exists Raises an error on retrieval errors.
- #unreserve ⇒ Object
Constructor Details
#initialize(bucket_path) ⇒ BucketFile
Returns a new instance of BucketFile.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 41 def initialize(bucket_path) @bucket = OpenC3::Bucket.getClient() @bucket_path = bucket_path @local_path = nil @reservation_count = 0 @size = 0 @priority = 0 @error = nil @init_time = Time.now @mutex = Mutex.new path_split = @bucket_path.split("/") scope = path_split[0].to_s.upcase stream_mode = path_split[1].to_s.split("_")[0].to_s.upcase if stream_mode == 'REDUCED' stream_mode << '_' << path_split[1].to_s.split("_")[1].to_s.upcase end cmd_or_tlm = path_split[2].to_s.upcase target_name = path_split[3].to_s.upcase if stream_mode == 'RAW' type = (cmd_or_tlm == 'CMD') ? 'COMMAND' : 'TELEMETRY' else if stream_mode == 'DECOM' type = (cmd_or_tlm == 'CMD') ? 'DECOMCMD' : 'DECOM' else type = stream_mode # REDUCED_MINUTE, REDUCED_HOUR, or REDUCED_DAY end end @topic_prefix = "#{scope}__#{type}__{#{target_name}}" end |
Instance Attribute Details
#bucket_path ⇒ Object (readonly)
Returns the value of attribute bucket_path.
32 33 34 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 32 def bucket_path @bucket_path end |
#error ⇒ Object (readonly)
Returns the value of attribute error.
36 37 38 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 36 def error @error end |
#init_time ⇒ Object (readonly)
Returns the value of attribute init_time.
38 39 40 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 38 def init_time @init_time end |
#local_path ⇒ Object (readonly)
Returns the value of attribute local_path.
33 34 35 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 33 def local_path @local_path end |
#priority ⇒ Object
Returns the value of attribute priority.
39 40 41 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 39 def priority @priority end |
#reservation_count ⇒ Object (readonly)
Returns the value of attribute reservation_count.
34 35 36 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 34 def reservation_count @reservation_count end |
#size ⇒ Object (readonly)
Returns the value of attribute size.
35 36 37 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 35 def size @size end |
#topic_prefix ⇒ Object (readonly)
Returns the value of attribute topic_prefix.
37 38 39 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 37 def topic_prefix @topic_prefix end |
Instance Method Details
#age_check ⇒ Object
128 129 130 131 132 133 134 135 136 137 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 128 def age_check @mutex.synchronize do if (Time.now - @init_time) > MAX_AGE_SECONDS and @reservation_count <= 0 delete() return true else return false end end end |
#delete ⇒ Object
private
141 142 143 144 145 146 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 141 def delete if @local_path and File.exist?(@local_path) File.delete(@local_path) @local_path = nil end end |
#reserve ⇒ Object
115 116 117 118 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 115 def reserve @mutex.synchronize { @reservation_count += 1 } return retrieve() end |
#retrieve(client = @bucket, uncompress = true) ⇒ Object
Returns true if the file was retrieved and added to the disk Returns false if the file already exists Raises an error on retrieval errors
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 74 def retrieve(client = @bucket, uncompress = true) @mutex.synchronize do local_path = "#{BucketFileCache.instance.cache_dir}/#{File.basename(@bucket_path)}" unless File.exist?(local_path) OpenC3::Logger.debug "Retrieving #{@bucket_path} from logs bucket" retry_count = 0 begin client_result = client.get_object(bucket: ENV['OPENC3_LOGS_BUCKET'], key: @bucket_path, path: local_path) unless File.exist?(local_path) raise "Local file does not exist after get_object: #{client_result.inspect}" end rescue => err # Try to retrieve the file three times retry_count += 1 raise err if retry_count >= 3 OpenC3::Logger.warn("Error retrieving log file from bucket - retry #{retry_count}: #{@bucket_path}\n#{err.formatted}") sleep(1) retry end if File.exist?(local_path) basename = File.basename(local_path) if uncompress and File.extname(basename) == ".gz" uncompressed = OpenC3::BucketUtilities.uncompress_file(local_path) File.delete(local_path) local_path = uncompressed end @size = File.size(local_path) @local_path = local_path return true end end return false end rescue => err @error = err OpenC3::Logger.error "Failed to retrieve #{@bucket_path}\n#{err.formatted}" raise err end |
#unreserve ⇒ Object
120 121 122 123 124 125 126 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 120 def unreserve @mutex.synchronize do @reservation_count -= 1 delete() if @reservation_count <= 0 return @reservation_count end end |