Class: Tus::Storage::Gridfs
- Inherits:
-
Object
- Object
- Tus::Storage::Gridfs
- Defined in:
- lib/tus/storage/gridfs.rb
Constant Summary collapse
- BATCH_SIZE =
5 * 1024 * 1024
Instance Attribute Summary collapse
-
#bucket ⇒ Object
readonly
Returns the value of attribute bucket.
-
#chunk_size ⇒ Object
readonly
Returns the value of attribute chunk_size.
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#prefix ⇒ Object
readonly
Returns the value of attribute prefix.
Instance Method Summary collapse
-
#concatenate(uid, part_uids, info = {}) ⇒ Object
Concatenates multiple partial uploads into a single upload, and returns the size of the resulting upload.
-
#create_file(uid, info = {}) ⇒ Object
Creates a file for the specified upload.
-
#delete_file(uid, info = {}) ⇒ Object
Deletes the GridFS file and chunks for the specified upload.
-
#expire_files(expiration_date) ⇒ Object
Deletes GridFS file and chunks of uploads older than the specified date.
-
#get_file(uid, info = {}, range: nil) ⇒ Object
Returns a Tus::Response object through which data of the specified upload can be retrieved in a streaming fashion.
-
#initialize(client:, prefix: "fs", chunk_size: 256*1024) ⇒ Gridfs
constructor
Initializes the GridFS storage and creates necessary indexes.
-
#patch_file(uid, input, info = {}) ⇒ Object
Appends data to the specified upload in a streaming fashion, and returns the number of bytes it managed to save.
-
#read_info(uid) ⇒ Object
Returns info of the specified upload.
-
#update_info(uid, info) ⇒ Object
Updates info of the specified upload.
Constructor Details
#initialize(client:, prefix: "fs", chunk_size: 256*1024) ⇒ Gridfs
Initializes the GridFS storage and creates necessary indexes.
21 22 23 24 25 26 27 28 |
# File 'lib/tus/storage/gridfs.rb', line 21 def initialize(client:, prefix: "fs", chunk_size: 256*1024) @client = client @prefix = prefix @chunk_size = chunk_size @bucket = client.database.fs(bucket_name: prefix) @bucket.send(:ensure_indexes!) end |
Instance Attribute Details
#bucket ⇒ Object (readonly)
Returns the value of attribute bucket.
18 19 20 |
# File 'lib/tus/storage/gridfs.rb', line 18 def bucket @bucket end |
#chunk_size ⇒ Object (readonly)
Returns the value of attribute chunk_size.
18 19 20 |
# File 'lib/tus/storage/gridfs.rb', line 18 def chunk_size @chunk_size end |
#client ⇒ Object (readonly)
Returns the value of attribute client.
18 19 20 |
# File 'lib/tus/storage/gridfs.rb', line 18 def client @client end |
#prefix ⇒ Object (readonly)
Returns the value of attribute prefix.
18 19 20 |
# File 'lib/tus/storage/gridfs.rb', line 18 def prefix @prefix end |
Instance Method Details
#concatenate(uid, part_uids, info = {}) ⇒ Object
Concatenates multiple partial uploads into a single upload, and returns the size of the resulting upload. The partial uploads are deleted after concatenation.
It concatenates by updating partial upload’s GridFS chunks to point to the new upload.
Raises Tus::Error if GridFS chunks of partial uploads don’t exist or aren’t completely filled.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/tus/storage/gridfs.rb', line 49 def concatenate(uid, part_uids, info = {}) grid_infos = files_collection.find(filename: {"$in" => part_uids}).to_a grid_infos.sort_by! { |grid_info| part_uids.index(grid_info[:filename]) } validate_parts!(grid_infos, part_uids) length = grid_infos.map { |doc| doc[:length] }.reduce(0, :+) content_type = Tus::Info.new(info).type grid_file = create_grid_file( filename: uid, length: length, content_type: content_type, ) # Update the chunks belonging to parts so that they point to the new file. grid_infos.inject(0) do |offset, grid_info| result = chunks_collection .find(files_id: grid_info[:_id]) .update_many( "$set" => { files_id: grid_file.id }, "$inc" => { n: offset }, ) offset += result.modified_count end # Delete the parts after concatenation. files_collection.delete_many(filename: {"$in" => part_uids}) end |
#create_file(uid, info = {}) ⇒ Object
Creates a file for the specified upload.
31 32 33 34 35 36 37 38 |
# File 'lib/tus/storage/gridfs.rb', line 31 def create_file(uid, info = {}) content_type = Tus::Info.new(info).["content_type"] create_grid_file( filename: uid, content_type: content_type, ) end |
#delete_file(uid, info = {}) ⇒ Object
Deletes the GridFS file and chunks for the specified upload.
200 201 202 203 |
# File 'lib/tus/storage/gridfs.rb', line 200 def delete_file(uid, info = {}) grid_info = files_collection.find(filename: uid).first bucket.delete(grid_info[:_id]) if grid_info end |
#expire_files(expiration_date) ⇒ Object
Deletes GridFS file and chunks of uploads older than the specified date.
206 207 208 209 210 211 212 |
# File 'lib/tus/storage/gridfs.rb', line 206 def expire_files(expiration_date) grid_infos = files_collection.find(uploadDate: {"$lte" => expiration_date}).to_a grid_info_ids = grid_infos.map { |info| info[:_id] } files_collection.delete_many(_id: {"$in" => grid_info_ids}) chunks_collection.delete_many(files_id: {"$in" => grid_info_ids}) end |
#get_file(uid, info = {}, range: nil) ⇒ Object
Returns a Tus::Response object through which data of the specified upload can be retrieved in a streaming fashion. Accepts an optional range parameter for selecting a subset of bytes we want to retrieve.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/tus/storage/gridfs.rb', line 151 def get_file(uid, info = {}, range: nil) grid_info = files_collection.find(filename: uid).first filter = { files_id: grid_info[:_id] } if range chunk_start = range.begin / grid_info[:chunkSize] chunk_stop = range.end / grid_info[:chunkSize] filter[:n] = {"$gte" => chunk_start, "$lte" => chunk_stop} end # Query only the subset of chunks specified by the range query. We # cannot use Mongo::FsBucket#open_download_stream here because it # doesn't support changing the filter. chunks_view = chunks_collection.find(filter).sort(n: 1) # Create an Enumerator which will yield chunks of the requested file # content, allowing tus server to efficiently stream requested content # to the client. chunks = Enumerator.new do |yielder| chunks_view.each do |document| data = document[:data].data if document[:n] == chunk_start && document[:n] == chunk_stop byte_start = range.begin % grid_info[:chunkSize] byte_stop = range.end % grid_info[:chunkSize] elsif document[:n] == chunk_start byte_start = range.begin % grid_info[:chunkSize] byte_stop = grid_info[:chunkSize] - 1 elsif document[:n] == chunk_stop byte_start = 0 byte_stop = range.end % grid_info[:chunkSize] end # If we're on the first or last chunk, return a subset of the chunk # specified by the given range, otherwise return the full chunk. if byte_start && byte_stop yielder << data[byte_start..byte_stop] else yielder << data end end end Tus::Response.new(chunks: chunks, close: chunks_view.method(:close_query)) end |
#patch_file(uid, input, info = {}) ⇒ Object
Appends data to the specified upload in a streaming fashion, and returns the number of bytes it managed to save.
It does so by reading the input data in batches of chunks, creating a new GridFS chunk for each chunk of data and appending it to the existing list.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/tus/storage/gridfs.rb', line 86 def patch_file(uid, input, info = {}) grid_info = files_collection.find(filename: uid).first current_length = grid_info[:length] chunk_size = grid_info[:chunkSize] bytes_saved = 0 # It's possible that the previous data append didn't fill in the last # GridFS chunk completely, so we fill in that gap now before creating # new GridFS chunks. bytes_saved += patch_last_chunk(input, grid_info) if current_length % chunk_size != 0 # Create an Enumerator which yields chunks of input data which have the # size of the configured :chunkSize of the GridFS file. chunks_enumerator = Enumerator.new do |yielder| while (data = input.read(chunk_size)) yielder << data end end chunks_in_batch = (BATCH_SIZE.to_f / chunk_size).ceil chunks_offset = chunks_collection.count(files_id: grid_info[:_id]) - 1 # Iterate in batches of data chunks and bulk-insert new GridFS chunks. # This way we try to have a balance between bulking inserts and keeping # memory usage low. chunks_enumerator.each_slice(chunks_in_batch) do |chunks| grid_chunks = chunks.map do |data| Mongo::Grid::File::Chunk.new( data: BSON::Binary.new(data), files_id: grid_info[:_id], n: chunks_offset += 1, ) end chunks_collection.insert_many(grid_chunks) # Update the total length and refresh the upload date on each update, # which are used in #get_file, #concatenate and #expire_files. files_collection.find(filename: uid).update_one( "$inc" => { length: chunks.map(&:bytesize).inject(0, :+) }, "$set" => { uploadDate: Time.now.utc }, ) bytes_saved += chunks.map(&:bytesize).inject(0, :+) chunks.each(&:clear) # deallocate strings end bytes_saved end |
#read_info(uid) ⇒ Object
Returns info of the specified upload. Raises Tus::NotFound if the upload wasn’t found.
138 139 140 141 |
# File 'lib/tus/storage/gridfs.rb', line 138 def read_info(uid) grid_info = files_collection.find(filename: uid).first or raise Tus::NotFound grid_info[:metadata] end |
#update_info(uid, info) ⇒ Object
Updates info of the specified upload.
144 145 146 |
# File 'lib/tus/storage/gridfs.rb', line 144 def update_info(uid, info) files_collection.update_one({filename: uid}, {"$set" => {metadata: info}}) end |