-
Notifications
You must be signed in to change notification settings - Fork 25
fix: Flush pending inserts in streamingbatchwriter on delete records #2408
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -624,3 +624,61 @@ | |||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| return -1 | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| func TestDeleteRecordFlushesPendingInserts(t *testing.T) { | ||||||||||||||||||||||||||||||||||||
| t.Parallel() | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| ctx := context.Background() | ||||||||||||||||||||||||||||||||||||
| errCh := make(chan error, 10) | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| testClient := newClient() | ||||||||||||||||||||||||||||||||||||
| wr, err := New(testClient, WithBatchSizeRows(1000000)) // large batch to avoid auto-flush | ||||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||||
| t.Fatal(err) | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // Create a table for insert | ||||||||||||||||||||||||||||||||||||
| insertTable := &schema.Table{ | ||||||||||||||||||||||||||||||||||||
| Name: "child_table", | ||||||||||||||||||||||||||||||||||||
| Columns: []schema.Column{ | ||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||
| Name: "id", | ||||||||||||||||||||||||||||||||||||
| Type: arrow.PrimitiveTypes.Int64, | ||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // Build insert record | ||||||||||||||||||||||||||||||||||||
| bldr := array.NewRecordBuilder(memory.DefaultAllocator, insertTable.ToArrowSchema()) | ||||||||||||||||||||||||||||||||||||
| bldr.Field(0).(*array.Int64Builder).Append(1) | ||||||||||||||||||||||||||||||||||||
| record := bldr.NewRecord() | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| md := arrow.NewMetadata( | ||||||||||||||||||||||||||||||||||||
| []string{schema.MetadataTableName}, | ||||||||||||||||||||||||||||||||||||
| []string{insertTable.Name}, | ||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||
| newSchema := arrow.NewSchema( | ||||||||||||||||||||||||||||||||||||
| record.Schema().Fields(), | ||||||||||||||||||||||||||||||||||||
| &md, | ||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| record = array.NewRecord(newSchema, record.Columns(), record.NumRows()) | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
|
Comment on lines
+653
to
+666
|
||||||||||||||||||||||||||||||||||||
| bldr.Field(0).(*array.Int64Builder).Append(1) | |
| record := bldr.NewRecord() | |
| md := arrow.NewMetadata( | |
| []string{schema.MetadataTableName}, | |
| []string{insertTable.Name}, | |
| ) | |
| newSchema := arrow.NewSchema( | |
| record.Schema().Fields(), | |
| &md, | |
| ) | |
| record = array.NewRecord(newSchema, record.Columns(), record.NumRows()) | |
| defer bldr.Release() | |
| bldr.Field(0).(*array.Int64Builder).Append(1) | |
| record := bldr.NewRecord() | |
| defer record.Release() |
Copilot
AI
Feb 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test ignores the error from wr.Close(ctx). If Close fails (or hangs) this can mask real issues and leak goroutines into subsequent tests. Consider checking the returned error (or using t.Cleanup to close and assert).
| _ = wr.Close(ctx) | |
| if err := wr.Close(ctx); err != nil { | |
| t.Fatal(err) | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushInsertWorkerscan return early onctx.Done()after the flush request has been received by the worker but before this function reads fromdone. In that case the worker goroutine will block forever ondone <- true, effectively deadlocking that worker (and potentially the writer). Consider makingdonea buffered channel (size 1) and/or ensuring the ack is always drainable even when the context is canceled.