Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/sources/s3/checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (p *Checkpointer) Reset() {
type ResumeInfo struct {
CurrentBucket string `json:"current_bucket"` // Current bucket being scanned
StartAfter string `json:"start_after"` // Last processed object key
Role string `json:"role"` // Role used for scanning
}

// ResumePoint retrieves the last saved checkpoint state if one exists.
Expand All @@ -121,7 +122,7 @@ func (p *Checkpointer) ResumePoint(ctx context.Context) (ResumeInfo, error) {
return resume, nil
}

return ResumeInfo{CurrentBucket: resumeInfo.CurrentBucket, StartAfter: resumeInfo.StartAfter}, nil
return ResumeInfo{CurrentBucket: resumeInfo.CurrentBucket, StartAfter: resumeInfo.StartAfter, Role: resumeInfo.Role}, nil
}

// Complete marks the entire scanning operation as finished and clears the resume state.
Expand Down Expand Up @@ -215,7 +216,7 @@ func (p *Checkpointer) updateCheckpoint(bucket string, role string, lastKey stri
return nil
}

encoded, err := json.Marshal(&ResumeInfo{CurrentBucket: bucket, StartAfter: lastKey})
encoded, err := json.Marshal(&ResumeInfo{CurrentBucket: bucket, StartAfter: lastKey, Role: role})
if err != nil {
return fmt.Errorf("failed to encode resume info: %w", err)
}
Expand Down
180 changes: 180 additions & 0 deletions pkg/sources/s3/checkpointer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,56 @@ func TestCheckpointerResumption(t *testing.T) {
assert.Equal(t, "key-11", finalResumeInfo.StartAfter)
}

func TestCheckpointerResumptionWithRole(t *testing.T) {
ctx := context.Background()

// First scan - process 6 objects then interrupt.
initialProgress := &sources.Progress{}
tracker := NewCheckpointer(ctx, initialProgress)
role := "test-role"

firstPage := &s3.ListObjectsV2Output{
Contents: make([]s3types.Object, 12), // Total of 12 objects
}
for i := range 12 {
key := fmt.Sprintf("key-%d", i)
firstPage.Contents[i] = s3types.Object{Key: &key}
}

// Process first 6 objects.
for i := range 6 {
err := tracker.UpdateObjectCompletion(ctx, i, "test-bucket", role, firstPage.Contents)
assert.NoError(t, err)
}

// Verify resume info is set correctly.
resumeInfo, err := tracker.ResumePoint(ctx)
require.NoError(t, err)
assert.Equal(t, "test-bucket", resumeInfo.CurrentBucket)
assert.Equal(t, "key-5", resumeInfo.StartAfter)
assert.Equal(t, role, resumeInfo.Role)

// Resume scan with existing progress.
resumeTracker := NewCheckpointer(ctx, initialProgress)

resumePage := &s3.ListObjectsV2Output{
Contents: firstPage.Contents[6:], // Remaining 6 objects
}

// Process remaining objects.
for i := range len(resumePage.Contents) {
err := resumeTracker.UpdateObjectCompletion(ctx, i, "test-bucket", role, resumePage.Contents)
assert.NoError(t, err)
}

// Verify final resume info.
finalResumeInfo, err := resumeTracker.ResumePoint(ctx)
require.NoError(t, err)
assert.Equal(t, "test-bucket", finalResumeInfo.CurrentBucket)
assert.Equal(t, "key-11", finalResumeInfo.StartAfter)
assert.Equal(t, role, finalResumeInfo.Role)
}

func TestCheckpointerReset(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -111,6 +161,13 @@ func TestGetResumePoint(t *testing.T) {
},
expectedResumeInfo: ResumeInfo{CurrentBucket: "test-bucket", StartAfter: "test-key"},
},
{
name: "valid resume info with role",
progress: &sources.Progress{
EncodedResumeInfo: `{"current_bucket":"test-bucket","start_after":"test-key","role":"test-role"}`,
},
expectedResumeInfo: ResumeInfo{CurrentBucket: "test-bucket", StartAfter: "test-key", Role: "test-role"},
},
{
name: "empty encoded resume info",
progress: &sources.Progress{EncodedResumeInfo: ""},
Expand All @@ -121,6 +178,13 @@ func TestGetResumePoint(t *testing.T) {
EncodedResumeInfo: `{"current_bucket":"","start_after":"test-key"}`,
},
},
{
name: "no role in resume info",
progress: &sources.Progress{
EncodedResumeInfo: `{"current_bucket":"test-bucket","start_after":"test-key"}`,
},
expectedResumeInfo: ResumeInfo{CurrentBucket: "test-bucket", StartAfter: "test-key", Role: ""},
},
{
name: "unmarshal error",
progress: &sources.Progress{
Expand Down Expand Up @@ -257,6 +321,122 @@ func TestCheckpointerUpdate(t *testing.T) {
})
}
}
func TestCheckpointerUpdateWithRole(t *testing.T) {
role := "test-role"
tests := []struct {
name string
description string
completedIdx int
pageSize int
preCompleted []int
expectedKey string
expectedRole string
expectedLowestIncomplete int
}{
{
name: "first object completed",
description: "Basic case - completing first object",
completedIdx: 0,
pageSize: 3,
expectedKey: "key-0",
expectedRole: role,
expectedLowestIncomplete: 1,
},
{
name: "completing missing middle",
description: "Completing object when previous is done",
completedIdx: 1,
pageSize: 3,
preCompleted: []int{0},
expectedKey: "key-1",
expectedRole: role,
expectedLowestIncomplete: 2,
},
{
name: "all objects completed in order",
description: "Completing final object in sequence",
completedIdx: 2,
pageSize: 3,
preCompleted: []int{0, 1},
expectedKey: "key-2",
expectedRole: role,
expectedLowestIncomplete: 3,
},
{
name: "out of order completion before lowest",
description: "Completing object before current lowest incomplete - should not affect checkpoint",
completedIdx: 1,
pageSize: 4,
preCompleted: []int{0, 2, 3},
expectedKey: "key-3",
expectedRole: role,
expectedLowestIncomplete: 4,
},
{
name: "last index in max page",
description: "Edge case - maximum page size boundary",
completedIdx: 999,
pageSize: 1000,
preCompleted: func() []int {
indices := make([]int, 999)
for i := range indices {
indices[i] = i
}
return indices
}(),
expectedKey: "key-999",
expectedRole: role,
expectedLowestIncomplete: 1000,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

ctx := context.Background()
progress := new(sources.Progress)
tracker := &Checkpointer{
progress: progress,
completedObjects: make([]bool, tt.pageSize),
completionOrder: make([]int, 0, tt.pageSize),
lowestIncompleteIdx: 0,
}

page := &s3.ListObjectsV2Output{Contents: make([]s3types.Object, tt.pageSize)}
for i := range tt.pageSize {
key := fmt.Sprintf("key-%d", i)
page.Contents[i] = s3types.Object{Key: &key}
}

// Setup pre-completed objects.
for _, idx := range tt.preCompleted {
tracker.completedObjects[idx] = true
tracker.completionOrder = append(tracker.completionOrder, idx)
}

// Find the correct lowest incomplete index after pre-completion.
for i := range tt.pageSize {
if !tracker.completedObjects[i] {
tracker.lowestIncompleteIdx = i
break
}
}

err := tracker.UpdateObjectCompletion(ctx, tt.completedIdx, "test-bucket", role, page.Contents)
assert.NoError(t, err, "Unexpected error updating progress")

var info ResumeInfo
err = json.Unmarshal([]byte(progress.EncodedResumeInfo), &info)
assert.NoError(t, err, "Failed to decode resume info")
assert.Equal(t, tt.expectedKey, info.StartAfter, "Incorrect resume point")
assert.Equal(t, tt.expectedRole, info.Role, "Incorrect role")

assert.Equal(t, tt.expectedLowestIncomplete, tracker.lowestIncompleteIdx,
"Incorrect lowest incomplete index")
})
}
}

func TestCheckpointerUpdateUnitScan(t *testing.T) {
ctx := context.Background()
Expand Down
6 changes: 5 additions & 1 deletion pkg/sources/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ type resumePosition struct {
startAfter string // The last processed object key within the bucket
isNewScan bool // True if we're starting a fresh scan
exactMatch bool // True if we found the exact bucket we were previously processing
role string // The role used during the previous scan
}

// determineResumePosition calculates where to resume scanning from based on the last saved checkpoint
Expand Down Expand Up @@ -282,6 +283,7 @@ func determineResumePosition(ctx context.Context, tracker *Checkpointer, buckets
startAfter: resumePoint.StartAfter,
index: startIdx,
exactMatch: found,
role: resumePoint.Role,
}
}

Expand All @@ -306,12 +308,14 @@ func (s *Source) scanBuckets(
"Resume bucket no longer available, starting from closest position",
"original_bucket", pos.bucket,
"position", pos.index,
"role", pos.role,
)
default:
ctx.Logger().Info(
"Resuming scan from previous scan's bucket",
"bucket", pos.bucket,
"position", pos.index,
"role", pos.role,
)
}

Expand All @@ -327,7 +331,7 @@ func (s *Source) scanBuckets(
)

var startAfter *string
if bucket == pos.bucket && pos.startAfter != "" {
if bucket == pos.bucket && pos.startAfter != "" && role == pos.role {
startAfter = &pos.startAfter
ctx.Logger().V(3).Info(
"Resuming bucket scan",
Expand Down
Loading