Skip to content

Commit 0a857fa

Browse files
Fix DownloadPartProgressEventCallback race condition (#4196)
1 parent 1012cfd commit 0a857fa

File tree

1 file changed

+20
-6
lines changed

1 file changed

+20
-6
lines changed

sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ internal class MultipartDownloadManager : IDownloadManager
6262

6363
// Atomic flag to ensure completion event fires exactly once
6464
// Without this, concurrent parts completing simultaneously can both see
65-
// transferredBytes >= _totalObjectSize and fire duplicate completion events
66-
// Uses int instead of bool because Interlocked.CompareExchange requires reference types
67-
private int _completionEventFired = 0; // 0 = false, 1 = true
65+
// transferredBytes == _totalObjectSize and fire duplicate completion events
66+
// Uses long instead of bool for compatibility with Interlocked operations
67+
private long _completionEventFired = 0; // 0 = false, 1 = true
6868

6969
private readonly Logger _logger = Logger.GetLogger(typeof(MultipartDownloadManager));
7070

@@ -733,26 +733,40 @@ private WriteObjectProgressArgs CreateProgressArgs(long incrementTransferred, lo
733733
/// Progress aggregation callback that combines progress across all concurrent part downloads.
734734
/// Uses thread-safe counter increment to handle concurrent updates.
735735
/// Detects completion naturally when transferred bytes reaches total size.
736-
/// Uses atomic flag to ensure completion event fires exactly once.
736+
/// Uses atomic flag to ensure completion event fires exactly once and prevents any events after completion.
737737
/// </summary>
738738
private void DownloadPartProgressEventCallback(object sender, WriteObjectProgressArgs e)
739739
{
740740
long transferredBytes = Interlocked.Add(ref _totalTransferredBytes, e.IncrementTransferred);
741741

742+
// Check if completion was already fired - if so, skip this event entirely
743+
// This prevents the race condition where per-part completion events arrive after
744+
// the aggregated completion event has already been fired
745+
if (Interlocked.Read(ref _completionEventFired) == 1)
746+
{
747+
return; // Already completed, don't fire any more events
748+
}
749+
742750
// Use atomic CompareExchange to ensure only first thread fires completion
743751
bool isComplete = false;
744-
if (transferredBytes >= _totalObjectSize)
752+
if (transferredBytes == _totalObjectSize)
745753
{
746754
// CompareExchange returns the original value before the exchange
747755
// If original value was 0 (false), we're the first thread and should fire completion
748-
int originalValue = Interlocked.CompareExchange(ref _completionEventFired, 1, 0);
756+
long originalValue = Interlocked.CompareExchange(ref _completionEventFired, 1, 0);
749757
if (originalValue == 0) // Was false, now set to true
750758
{
751759
isComplete = true;
752760
}
761+
else
762+
{
763+
// Another thread already fired completion, skip this event
764+
return;
765+
}
753766
}
754767

755768
// Create and fire aggregated progress event
769+
// Only reached if completion hasn't been fired yet
756770
var aggregatedArgs = CreateProgressArgs(e.IncrementTransferred, transferredBytes, isComplete);
757771
_userProgressCallback?.Invoke(this, aggregatedArgs);
758772
}

0 commit comments

Comments
 (0)