Conversation
|
Detected 1 possible performance regressions:
|
| # * `:completed_downloads` - Number of objects successfully downloaded | ||
| # * `:failed_downloads` - Number of objects that failed to download | ||
| # * `:errors` - Array of errors for failed downloads (only present when failures occur) | ||
| def download_directory(destination, bucket:, **options) |
There was a problem hiding this comment.
I need to allow for setting a custom thread for executor here. Same goes for uploader.
| request_abort unless opts[:ignore_failure] | ||
| end | ||
|
|
||
| def process_download_queue(producer, downloader, opts) |
There was a problem hiding this comment.
In a situation where we need to raise, I decided against doing @queue_executor.kill since there might be work in-flight that hangs. It would be best to exit gracefully and the bubbles up to raise within #build_results
| raise ArgumentError, 'Invalid directory' unless Dir.exist?(source_directory) | ||
|
|
||
| uploader = FileUploader.new( | ||
| multipart_threshold: opts.delete(:multipart_threshold), |
There was a problem hiding this comment.
Self-reminder to documentation this at TransferManager#upload_directory
I need to take a look at other params available on FileUploader and FileDownloader level but I'm concerned about overlapped params if it occurs.
alextwoods
left a comment
There was a problem hiding this comment.
Nice - this is looking good! Great test coverage and documentation. Good thread safety.
| end | ||
|
|
||
| def validate_path(path, key) | ||
| segments = path.split('/') |
There was a problem hiding this comment.
Should this be File::SEPERATOR rather than / here? I think the path at this point will come from File.join and so would have os seperator? I might be wrong about that though.
There was a problem hiding this comment.
I think I should reorder the normalization/validation here. You are correct - File.join will always use / separator based on documentation.
What I should actually do
- Validate that object key does not contain
.or..path segments first - Remove s3-prefix if applies
File.jointo combine destination directory and new key path- Update file separator if relevant
| def stream_objects(continuation_token: nil) | ||
| resp = @client.list_objects_v2(bucket: @bucket, prefix: @s3_prefix, continuation_token: continuation_token) | ||
| resp.contents&.each do |o| | ||
| break if @directory_downloader.abort_requested |
There was a problem hiding this comment.
This will block on the mutex - I'm not sure we always need to that here. It is definitely safe to do so, but likely has a performance hit. Ditto I think with the check on line 91:
begin
producer.each do |object|
break if abort_requested
Since we're using a SizedQueue for communicating between threads - maybe we could use clear/close on it rather than constantly blocking on the mutex? If we used close It will cause threads waiting to raise ClosedQueueError which we could catch and handle to detect aborts? I haven't fully thought that out, but I think it might simplify the code and avoid locking as much.
There was a problem hiding this comment.
Good idea. I'll give it a whirl to see how it pans out.
| @transferred_bytes += bytes_transferred | ||
| @transferred_files += 1 | ||
|
|
||
| @progress_callback.call(@transferred_bytes, @transferred_files) |
There was a problem hiding this comment.
I see why we're calling the progress_callback inside the synchronize block - it does ensure progress is linear for users. However, depending on what they're doing in the callback (things like IO like printing/writing to file, ect) - this could end up being a small bottleneck. I'm not sure how much that matters vs the fully ordered progress callbacks.
There was a problem hiding this comment.
Good callout. How does Java handle their directory progress?
| @mutex.synchronize { errors << e } | ||
| request_abort | ||
| end | ||
| upload_attempts.times { completion_queue.pop } |
There was a problem hiding this comment.
Is there a chance that upload_attemps will ever be larger than the completion queue?
There was a problem hiding this comment.
I believe ensure will push :done regardless what happens.
richardwang1124
left a comment
There was a problem hiding this comment.
Nice! Looks pretty good overall. Left a few comments.
| # you are responsible for shutting it down when finished. | ||
| def initialize(options = {}) | ||
| @client = options[:client] || Client.new | ||
| @executor = options[:executor] |
There was a problem hiding this comment.
What if you added DefaultExecutor here? Like
@executor = options[:executor] || DefaultExecutor.new
There was a problem hiding this comment.
When I initially implemented TransferManager, I considered this but decided against it.
I need a way to gracefully shutdown executors when a custom one isn't provided, so I scoped the default executors to their own methods. The plan is: when a custom executor is given, we use it but don't shut it down after work finishes.
For default executors, we treat them as single-use (like the internal classes themselves) and shut them down. They are also quite limited and shouldn't share resources across methods. I need to track this at the TM-level to differentiate between custom and default executors.
| @mutex.synchronize { errors << e } | ||
| request_abort | ||
| end | ||
| download_attempts.times { completion_queue.pop } |
There was a problem hiding this comment.
Is it guaranteed that download_attempts will be less than or equal to completion_queue size?
There was a problem hiding this comment.
Yes, I think the key here is the ensure bloc always pushes :done even on error
Adds directory upload/download to Transfer Manager.
upload_directorydownload_directoryBy submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
To make sure we include your contribution in the release notes, please make sure to add description entry for your changes in the "unreleased changes" section of the
CHANGELOG.mdfile (at corresponding gem). For the description entry, please make sure it lives in one line and starts withFeatureorIssuein the correct format.For generated code changes, please checkout below instructions first:
https://github.com/aws/aws-sdk-ruby/blob/version-3/CONTRIBUTING.md
Thank you for your contribution!