Class: Aws::S3::TransferManager
- Inherits:
-
Object
- Object
- Aws::S3::TransferManager
- Defined in:
- lib/aws-sdk-s3/transfer_manager.rb
Overview
A high-level S3 transfer utility that provides enhanced upload and download capabilities with automatic multipart handling, progress tracking, and handling of large files. The following features are supported:
-
upload a file with multipart upload
-
upload a stream with multipart upload
-
upload all files in a directory to an S3 bucket recursively or non-recursively
-
download an S3 object with multipart download
-
download all objects in an S3 bucket with same prefix to a local directory
-
track transfer progress by using progress listener
## Executor Management TransferManager uses executors to handle concurrent operations during multipart transfers. You can control concurrency behavior by providing a custom executor or relying on the default executor management.
### Default Behavior When no :executor is provided, TransferManager creates a new DefaultExecutor for each individual operation (download_file, upload_file, etc.) and automatically shuts it down when that operation completes. Each operation gets its own isolated thread pool with the specified :thread_count (default 10 threads).
### Custom Executor You can provide your own executor (e.g., Concurrent::ThreadPoolExecutor) for fine-grained control over thread pools and resource management. When using a custom executor, you are responsible for shutting it down when finished. The executor may be reused across multiple TransferManager operations.
Custom executors must implement the same interface as DefaultExecutor.
**Required methods:**
* `post(*args, &block)` - Execute a task with given arguments and block
* `kill` - Immediately terminate all running tasks
**Optional methods:**
* `shutdown(timeout = nil)` - Gracefully shutdown the executor with optional timeout
Instance Attribute Summary collapse
- #client ⇒ S3::Client readonly
- #executor ⇒ Object readonly
- #logger ⇒ Logger readonly
Instance Method Summary collapse
-
#download_directory(destination, bucket:, **options) ⇒ Hash
Downloads objects in a S3 bucket to a local directory.
-
#download_file(destination, bucket:, key:, **options) ⇒ Boolean
Downloads a file in S3 to a path on disk.
-
#initialize(options = {}) ⇒ TransferManager
constructor
A new instance of TransferManager.
-
#upload_directory(source, bucket:, **options) ⇒ Hash
Uploads all files under the given directory to the provided S3 bucket.
-
#upload_file(source, bucket:, key:, **options) {|response| ... } ⇒ Boolean
Uploads a file from disk to S3.
-
#upload_stream(bucket:, key:, **options, &block) ⇒ Boolean
Uploads a stream in a streaming fashion to S3.
Constructor Details
#initialize(options = {}) ⇒ TransferManager
Returns a new instance of TransferManager.
65 66 67 68 69 |
# File 'lib/aws-sdk-s3/transfer_manager.rb', line 65 def initialize( = {}) @client = [:client] || Client.new @executor = [:executor] @logger = [:logger] end |
Instance Attribute Details
#client ⇒ S3::Client (readonly)
72 73 74 |
# File 'lib/aws-sdk-s3/transfer_manager.rb', line 72 def client @client end |
#executor ⇒ Object (readonly)
75 76 77 |
# File 'lib/aws-sdk-s3/transfer_manager.rb', line 75 def executor @executor end |
#logger ⇒ Logger (readonly)
78 79 80 |
# File 'lib/aws-sdk-s3/transfer_manager.rb', line 78 def logger @logger end |
Instance Method Details
#download_directory(destination, bucket:, **options) ⇒ Hash
On case-insensitive filesystems (e.g., Windows, macOS default), S3 object keys that differ only by case (e.g., “File.txt” and “file.txt”) may overwrite each other when downloaded. This condition is not automatically detected. Use the :filter_callback option to handle such conflicts if needed.
Downloads objects in a S3 bucket to a local directory.
The downloaded directory structure will match the provided S3 virtual bucket. For example, assume that you have the following keys in your bucket:
-
sample.jpg
-
photos/2022/January/sample.jpg
-
photos/2022/February/sample1.jpg
-
photos/2022/February/sample2.jpg
-
photos/2022/February/sample3.jpg
Given a request to download bucket to a destination with path of /test, the downloaded directory would look like this:
“‘ |- test
|- sample.jpg
|- photos
|- 2022
|- January
|- sample.jpg
|- February
|- sample1.jpg
|- sample2.jpg
|- sample3.jpg
“‘
Directory markers (zero-byte objects ending with /) are skipped during download. Existing files with same name as downloaded objects will be overwritten.
Object keys containing path traversal sequences (.. or .) will raise an error.
161 162 163 164 165 166 167 168 169 |
# File 'lib/aws-sdk-s3/transfer_manager.rb', line 161 def download_directory(destination, bucket:, **) Aws::Plugins::UserAgent.metric('S3_TRANSFER', 'S3_TRANSFER_DOWNLOAD_DIRECTORY') do executor = @executor || DefaultExecutor.new(max_threads: .delete(:thread_count)) downloader = DirectoryDownloader.new(client: @client, executor: executor, logger: @logger) result = downloader.download(destination, bucket: bucket, **) executor.shutdown unless @executor result end end |
#download_file(destination, bucket:, key:, **options) ⇒ Boolean
Downloads a file in S3 to a path on disk.
# small files (< 5MB) are downloaded in a single API call
tm = TransferManager.new
tm.download_file('/path/to/file', bucket: 'bucket', key: 'key')
Files larger than 5MB are downloaded using multipart method:
# large files are split into parts and the parts are downloaded in parallel
tm.download_file('/path/to/large_file', bucket: 'bucket', key: 'key')
You can provide a callback to monitor progress of the download:
# bytes and part_sizes are each an array with 1 entry per part
# part_sizes may not be known until the first bytes are retrieved
progress = proc do |bytes, part_sizes, file_size|
bytes.map.with_index do |b, i|
puts "Part #{i + 1}: #{b} / #{part_sizes[i]}".join(' ') + "Total: #{100.0 * bytes.sum / file_size}%"
end
end
tm.download_file('/path/to/file', bucket: 'bucket', key: 'key', progress_callback: progress)
246 247 248 249 250 251 252 253 |
# File 'lib/aws-sdk-s3/transfer_manager.rb', line 246 def download_file(destination, bucket:, key:, **) download_opts = .merge(bucket: bucket, key: key) executor = @executor || DefaultExecutor.new(max_threads: download_opts.delete(:thread_count)) downloader = FileDownloader.new(client: @client, executor: executor) downloader.download(destination, download_opts) executor.shutdown unless @executor true end |
#upload_directory(source, bucket:, **options) ⇒ Hash
Uploads all files under the given directory to the provided S3 bucket. The key name transformation depends on the optional prefix.
By default, all subdirectories will be uploaded non-recursively and symbolic links are not followed automatically. Assume you have a local directory /test with the following structure:
“‘ |- test
|- sample.jpg
|- photos
|- 2022
|- January
|- sample.jpg
|- February
|- sample1.jpg
|- sample2.jpg
|- sample3.jpg
“‘
Give a request to upload directory /test to an S3 bucket on default setting, the target bucket will have the following S3 objects:
-
sample.jpg
If :recursive set to true, the target bucket will have the following S3 buckets:
-
sample.jpg
-
photos/2022/January/sample.jpg
-
photos/2022/February/sample1.jpg
-
photos/2022/February/sample2.jpg
-
photos/2022/February/sample3.jpg
Only regular files are uploaded; special files (sockets, pipes, devices) are skipped. Symlink cycles are detected and skipped when following symlinks. Empty directories are not represented in S3. Existing S3 objects with the same key are overwritten.
361 362 363 364 365 366 367 368 369 |
# File 'lib/aws-sdk-s3/transfer_manager.rb', line 361 def upload_directory(source, bucket:, **) Aws::Plugins::UserAgent.metric('S3_TRANSFER', 'S3_TRANSFER_UPLOAD_DIRECTORY') do executor = @executor || DefaultExecutor.new(max_threads: .delete(:thread_count)) uploader = DirectoryUploader.new(client: @client, executor: executor, logger: @logger) result = uploader.upload(source, bucket, **.merge(http_chunk_size: resolve_http_chunk_size())) executor.shutdown unless @executor result end end |
#upload_file(source, bucket:, key:, **options) {|response| ... } ⇒ Boolean
Uploads a file from disk to S3.
# a small file are uploaded with PutObject API
tm = TransferManager.new
tm.upload_file('/path/to/small_file', bucket: 'bucket', key: 'key')
Files larger than or equal to :multipart_threshold are uploaded using multipart upload APIs.
# large files are automatically split into parts and the parts are uploaded in parallel
tm.upload_file('/path/to/large_file', bucket: 'bucket', key: 'key')
The response of the S3 upload API is yielded if a block given.
# API response will have etag value of the file
tm.upload_file('/path/to/file', bucket: 'bucket', key: 'key') do |response|
etag = response.etag
end
You can provide a callback to monitor progress of the upload:
# bytes and totals are each an array with 1 entry per part
progress = proc do |bytes, totals|
bytes.map.with_index do |b, i|
puts "Part #{i + 1}: #{b} / #{totals[i]} " + "Total: #{100.0 * bytes.sum / totals.sum}%"
end
end
tm.upload_file('/path/to/file', bucket: 'bucket', key: 'key', progress_callback: progress)
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 |
# File 'lib/aws-sdk-s3/transfer_manager.rb', line 443 def upload_file(source, bucket:, key:, **) upload_opts = .merge(bucket: bucket, key: key) http_chunk_size = resolve_http_chunk_size(upload_opts) executor = @executor || DefaultExecutor.new(max_threads: upload_opts.delete(:thread_count)) uploader = FileUploader.new( multipart_threshold: upload_opts.delete(:multipart_threshold), http_chunk_size: http_chunk_size, client: @client, executor: executor ) response = uploader.upload(source, upload_opts) yield response if block_given? executor.shutdown unless @executor true end |
#upload_stream(bucket:, key:, **options, &block) ⇒ Boolean
Uploads a stream in a streaming fashion to S3.
Passed chunks automatically split into multipart upload parts and the parts are uploaded in parallel. This allows for streaming uploads that never touch the disk.
Note: There are known issues in JRuby until jruby-9.1.15.0, so avoid using this with older JRuby versions.
512 513 514 515 516 517 518 519 520 521 522 523 524 |
# File 'lib/aws-sdk-s3/transfer_manager.rb', line 512 def upload_stream(bucket:, key:, **, &block) upload_opts = .merge(bucket: bucket, key: key) executor = @executor || DefaultExecutor.new(max_threads: upload_opts.delete(:thread_count)) uploader = MultipartStreamUploader.new( client: @client, executor: executor, tempfile: upload_opts.delete(:tempfile), part_size: upload_opts.delete(:part_size) ) uploader.upload(upload_opts, &block) executor.shutdown unless @executor true end |