Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion writers/streamingbatchwriter/streamingbatchwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,31 @@
return nil // not checked below
}

func (w *StreamingBatchWriter) flushInsertWorkers(ctx context.Context) error {
w.workersLock.RLock()
workers := make([]*streamingWorkerManager[*message.WriteInsert], 0, len(w.insertWorkers))
for _, worker := range w.insertWorkers {
workers = append(workers, worker)
}
w.workersLock.RUnlock()

for _, worker := range workers {
done := make(chan bool)
select {
case <-ctx.Done():
return ctx.Err()
case worker.flush <- done:
}
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
}
Comment on lines +166 to +176
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushInsertWorkers can return early on ctx.Done() after the flush request has been received by the worker but before this function reads from done. In that case the worker goroutine will block forever on done <- true, effectively deadlocking that worker (and potentially the writer). Consider making done a buffered channel (size 1) and/or ensuring the ack is always drainable even when the context is canceled.

Copilot uses AI. Check for mistakes.
}
return nil
}


Check failure on line 181 in writers/streamingbatchwriter/streamingbatchwriter.go

View workflow job for this annotation

GitHub Actions / Lint with GolangCI

File is not properly formatted (gofmt)
func (w *StreamingBatchWriter) Close(context.Context) error {
w.workersLock.Lock()
defer w.workersLock.Unlock()
Expand Down Expand Up @@ -323,6 +348,10 @@

return nil
case *message.WriteDeleteRecord:
// flush pending inserts and table buffers before deletions
if err := w.flushInsertWorkers(ctx); err != nil {
return err
}
w.workersLock.Lock()
defer w.workersLock.Unlock()

Expand All @@ -331,7 +360,6 @@
return nil
}

// TODO: flush all workers for nested tables as well (See https://github.com/cloudquery/plugin-sdk/issues/1296)
w.deleteRecordWorker = &streamingWorkerManager[*message.WriteDeleteRecord]{
ch: make(chan *message.WriteDeleteRecord),
writeFunc: w.client.DeleteRecords,
Expand Down Expand Up @@ -516,3 +544,6 @@
}
}
}



58 changes: 58 additions & 0 deletions writers/streamingbatchwriter/streamingbatchwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,3 +624,61 @@
}
return -1
}

func TestDeleteRecordFlushesPendingInserts(t *testing.T) {
t.Parallel()

ctx := context.Background()
errCh := make(chan error, 10)

testClient := newClient()
wr, err := New(testClient, WithBatchSizeRows(1000000)) // large batch to avoid auto-flush
if err != nil {
t.Fatal(err)
}

// Create a table for insert
insertTable := &schema.Table{
Name: "child_table",
Columns: []schema.Column{
{
Name: "id",
Type: arrow.PrimitiveTypes.Int64,
},
},
}

// Build insert record
bldr := array.NewRecordBuilder(memory.DefaultAllocator, insertTable.ToArrowSchema())
bldr.Field(0).(*array.Int64Builder).Append(1)
record := bldr.NewRecord()

Check failure on line 654 in writers/streamingbatchwriter/streamingbatchwriter_test.go

View workflow job for this annotation

GitHub Actions / Lint with GolangCI

SA1019: bldr.NewRecord is deprecated: Use [NewRecordBatch] instead. (staticcheck)

md := arrow.NewMetadata(
[]string{schema.MetadataTableName},
[]string{insertTable.Name},
)
newSchema := arrow.NewSchema(
record.Schema().Fields(),
&md,
)

record = array.NewRecord(newSchema, record.Columns(), record.NumRows())

Check failure on line 665 in writers/streamingbatchwriter/streamingbatchwriter_test.go

View workflow job for this annotation

GitHub Actions / Lint with GolangCI

SA1019: array.NewRecord is deprecated: Use [NewRecordBatch] instead. (staticcheck)

Comment on lines +653 to +666
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test builds a record via NewRecord() and then wraps it with array.NewRecord(...) without releasing the builder/records. With Arrow's ref-counted memory, this can leak allocations across the test run. Prefer using NewRecordBatch() (as other tests do) and ensure the RecordBuilder/record(s) are released when no longer needed (and avoid the extra schema/metadata re-wrap if the table schema already includes the table-name metadata).

Suggested change
bldr.Field(0).(*array.Int64Builder).Append(1)
record := bldr.NewRecord()
md := arrow.NewMetadata(
[]string{schema.MetadataTableName},
[]string{insertTable.Name},
)
newSchema := arrow.NewSchema(
record.Schema().Fields(),
&md,
)
record = array.NewRecord(newSchema, record.Columns(), record.NumRows())
defer bldr.Release()
bldr.Field(0).(*array.Int64Builder).Append(1)
record := bldr.NewRecord()
defer record.Release()

Copilot uses AI. Check for mistakes.
// Send insert

Check failure on line 667 in writers/streamingbatchwriter/streamingbatchwriter_test.go

View workflow job for this annotation

GitHub Actions / Lint with GolangCI

File is not properly formatted (gofmt)
if err := wr.startWorker(ctx, errCh, &message.WriteInsert{Record: record}); err != nil {
t.Fatal(err)
}

// send delete record to trigger flush
del := &message.WriteDeleteRecord{
DeleteRecord: message.DeleteRecord{
TableName: insertTable.Name,
},
}

if err := wr.startWorker(ctx, errCh, del); err != nil {
t.Fatal(err)
}
waitForLength(t, testClient.MessageLen, messageTypeInsert, 1)
_ = wr.Close(ctx)
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test ignores the error from wr.Close(ctx). If Close fails (or hangs) this can mask real issues and leak goroutines into subsequent tests. Consider checking the returned error (or using t.Cleanup to close and assert).

Suggested change
_ = wr.Close(ctx)
if err := wr.Close(ctx); err != nil {
t.Fatal(err)
}

Copilot uses AI. Check for mistakes.
}
Loading