fix: Fix race condition on batchsender when sending resources from multiple goroutines#2405
Conversation
There was a problem hiding this comment.
Pull request overview
This PR fixes a race condition in the BatchSender component that occurs when multiple goroutines call Send() concurrently while timer-triggered flushes are also happening. The race was caused by insufficient locking around shared state access between timer callbacks and Send() calls.
Changes:
- Refactored locking strategy to hold a single mutex for the entire duration of Send() operations and timer callbacks
- Renamed mutex from
itemsLocktomufor consistency with codebase conventions - Added a race condition test that exercises concurrent Send() calls with timer flushes
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| scheduler/batchsender/batch_sender.go | Fixed race condition by holding lock during entire Send() operation, acquiring lock in timer callbacks, and adding lock to Close() method; renamed internal method to flushLocked() |
| scheduler/batchsender/batch_sender_test.go | Added concurrent test to verify no data races when sending from multiple goroutines with timer flushes |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // This test verifies there is no data race between Send() and the timer-triggered flush. | ||
| func TestSend_ConcurrentWithTimerFlush(_ *testing.T) { | ||
| // The race occurs when: | ||
| // 1. a Send() call schedules a timer via time.AfterFunc | ||
| // 2. the timer fires and calls flush() on a separate goroutine | ||
| // 3. another Send() reads bs.items concurrently. | ||
| // | ||
| // To trigger this, we send items from multiple goroutines with delays around batchTimeout so the timer fires between Sends. | ||
| var mu sync.Mutex | ||
| var received []any | ||
|
|
||
| const numGoroutines = 5 | ||
| const sendsPerGoroutine = 20 | ||
|
|
||
| bs := NewBatchSender(func(items any) { | ||
| mu.Lock() | ||
| defer mu.Unlock() | ||
| received = append(received, items) | ||
| }) | ||
|
|
||
| var wg sync.WaitGroup | ||
| wg.Add(numGoroutines) | ||
| for range numGoroutines { | ||
| go func() { | ||
| defer wg.Done() | ||
| for range sendsPerGoroutine { | ||
| bs.Send("item") | ||
| time.Sleep(batchTimeout + 10*time.Millisecond) | ||
| } | ||
| }() | ||
| } | ||
|
|
||
| wg.Wait() | ||
| bs.Close() | ||
| } |
There was a problem hiding this comment.
The test verifies there is no data race but doesn't verify correctness of behavior. Consider adding assertions to verify that all items were received and that the batching behavior is correct. For example, you could check the total number of items received, or verify that batch sizes are appropriate. This is particularly important since the test uses synchronization primitives (mutex around received slice) but doesn't validate the data itself.
🤖 I have created a release *beep* *boop* --- ## [4.94.2](v4.94.1...v4.94.2) (2026-02-06) ### Bug Fixes * **deps:** Update module github.com/cloudquery/codegen to v0.3.36 ([#2403](#2403)) ([b7188f1](b7188f1)) * Fix race condition on batchsender when sending resources from multiple goroutines ([#2405](#2405)) ([a0e2801](a0e2801)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
No description provided.