Class: BucketFile

Inherits:
Object show all
Defined in:
lib/openc3/utilities/bucket_file_cache.rb

Constant Summary collapse

MAX_AGE_SECONDS =

4 hours

3600 * 4

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bucket_path) ⇒ BucketFile

Returns a new instance of BucketFile.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 36

def initialize(bucket_path)
  @bucket = OpenC3::Bucket.getClient()
  @bucket_path = bucket_path
  @local_path = nil
  @reservation_count = 0
  @size = 0
  @priority = 0
  @error = nil
  @init_time = Time.now
  @mutex = Mutex.new
  path_split = @bucket_path.split("/")
  scope = path_split[0].to_s.upcase
  stream_mode = path_split[1].to_s.split("_")[0].to_s.upcase
  cmd_or_tlm = path_split[2].to_s.upcase
  target_name = path_split[3].to_s.upcase
  if stream_mode == 'RAW'
    type = (cmd_or_tlm == 'CMD') ? 'COMMAND' : 'TELEMETRY'
  else
    if stream_mode == 'DECOM'
      type = (cmd_or_tlm == 'CMD') ? 'DECOMCMD' : 'DECOM'
    end
  end
  @topic_prefix = "#{scope}__#{type}__{#{target_name}}"
end

Instance Attribute Details

#bucket_pathObject (readonly)

Returns the value of attribute bucket_path.



27
28
29
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 27

def bucket_path
  @bucket_path
end

#errorObject (readonly)

Returns the value of attribute error.



31
32
33
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 31

def error
  @error
end

#init_timeObject (readonly)

Returns the value of attribute init_time.



33
34
35
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 33

def init_time
  @init_time
end

#local_pathObject (readonly)

Returns the value of attribute local_path.



28
29
30
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 28

def local_path
  @local_path
end

#priorityObject

Returns the value of attribute priority.



34
35
36
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 34

def priority
  @priority
end

#reservation_countObject (readonly)

Returns the value of attribute reservation_count.



29
30
31
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 29

def reservation_count
  @reservation_count
end

#sizeObject (readonly)

Returns the value of attribute size.



30
31
32
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 30

def size
  @size
end

#topic_prefixObject (readonly)

Returns the value of attribute topic_prefix.



32
33
34
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 32

def topic_prefix
  @topic_prefix
end

Instance Method Details

#age_checkObject



118
119
120
121
122
123
124
125
126
127
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 118

def age_check
  @mutex.synchronize do
    if (Time.now - @init_time) > MAX_AGE_SECONDS and @reservation_count <= 0
      delete()
      return true
    else
      return false
    end
  end
end

#deleteObject

private



131
132
133
134
135
136
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 131

def delete
  if @local_path and File.exist?(@local_path)
    File.delete(@local_path)
    @local_path = nil
  end
end

#reserveObject



105
106
107
108
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 105

def reserve
  @mutex.synchronize { @reservation_count += 1 }
  return retrieve()
end

#retrieve(client = @bucket, uncompress = true) ⇒ Object

Returns true if the file was retrieved and added to the disk Returns false if the file already exists Raises an error on retrieval errors



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 64

def retrieve(client = @bucket, uncompress = true)
  @mutex.synchronize do
    local_path = "#{BucketFileCache.instance.cache_dir}/#{File.basename(@bucket_path)}"
    unless File.exist?(local_path)
      OpenC3::Logger.debug "Retrieving #{@bucket_path} from logs bucket"

      retry_count = 0
      begin
        client_result = client.get_object(bucket: ENV['OPENC3_LOGS_BUCKET'], key: @bucket_path, path: local_path)
        unless File.exist?(local_path)
          raise "Local file does not exist after get_object: #{client_result.inspect}"
        end
      rescue => err
        # Try to retrieve the file three times
        retry_count += 1
        raise err if retry_count >= 3
        OpenC3::Logger.warn("Error retrieving log file from bucket - retry #{retry_count}: #{@bucket_path}\n#{err.formatted}")
        sleep(1)
        retry
      end

      if File.exist?(local_path)
        basename = File.basename(local_path)
        if uncompress and File.extname(basename) == ".gz"
          uncompressed = OpenC3::BucketUtilities.uncompress_file(local_path)
          File.delete(local_path)
          local_path = uncompressed
        end
        @size = File.size(local_path)
        @local_path = local_path
        return true
      end
    end
    return false
  end
rescue => err
  @error = err
  OpenC3::Logger.error "Failed to retrieve #{@bucket_path}\n#{err.formatted}"
  raise err
end

#unreserveObject



110
111
112
113
114
115
116
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 110

def unreserve
  @mutex.synchronize do
    @reservation_count -= 1
    delete() if @reservation_count <= 0
    return @reservation_count
  end
end