From e5c836163df4f1551b858a441afc4759b28b1e84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 6 Feb 2026 16:59:19 +0200 Subject: [PATCH 1/3] fix: Fix race condition on calling Send from multiple scheduler goroutines --- scheduler/batchsender/batch_sender.go | 43 ++++++++++++++------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/scheduler/batchsender/batch_sender.go b/scheduler/batchsender/batch_sender.go index 941b6aef83..8e0bdfb919 100644 --- a/scheduler/batchsender/batch_sender.go +++ b/scheduler/batchsender/batch_sender.go @@ -19,10 +19,10 @@ const ( // - If the current batch has reached the batch size, it will be sent immediately // - Otherwise, a timer will be started to send the current batch after the batch timeout type BatchSender struct { - sendFn func(any) - items []any - timer *time.Timer - itemsLock sync.Mutex + sendFn func(any) + items []any + timer *time.Timer + mu sync.Mutex } func NewBatchSender(sendFn func(any)) *BatchSender { @@ -30,6 +30,9 @@ func NewBatchSender(sendFn func(any)) *BatchSender { } func (bs *BatchSender) Send(item any) { + bs.mu.Lock() + defer bs.mu.Unlock() + if bs.timer != nil { bs.timer.Stop() } @@ -39,34 +42,29 @@ func (bs *BatchSender) Send(item any) { // If item is already a slice, send it directly // together with the current batch if len(items) > 1 { - bs.flush(items...) + bs.flushLocked(items...) return } // Otherwise, add item to the current batch - bs.appendToBatch(items...) + bs.items = append(bs.items, items...) // If the current batch has reached the batch size, send it if len(bs.items) >= batchSize { - bs.flush() + bs.flushLocked() return } // Otherwise, start a timer to send the current batch after the batch timeout - bs.timer = time.AfterFunc(batchTimeout, func() { bs.flush() }) + bs.timer = time.AfterFunc(batchTimeout, func() { + bs.mu.Lock() + defer bs.mu.Unlock() + bs.flushLocked() + }) } -func (bs *BatchSender) appendToBatch(items ...any) { - bs.itemsLock.Lock() - defer bs.itemsLock.Unlock() - - bs.items = append(bs.items, items...) -} - -func (bs *BatchSender) flush(items ...any) { - bs.itemsLock.Lock() - defer bs.itemsLock.Unlock() - +// flushLocked sends all buffered items. Must be called with bs.mu held. +func (bs *BatchSender) flushLocked(items ...any) { bs.items = append(bs.items, items...) if len(bs.items) == 0 { @@ -78,8 +76,11 @@ func (bs *BatchSender) flush(items ...any) { } func (bs *BatchSender) Close() { + bs.mu.Lock() + defer bs.mu.Unlock() + if bs.timer != nil { bs.timer.Stop() } - bs.flush() -} + bs.flushLocked() +} \ No newline at end of file From 8be4b480a365400376af630e1418bf2c984af204 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 6 Feb 2026 17:09:31 +0200 Subject: [PATCH 2/3] feat: Add test for validating the fix --- scheduler/batchsender/batch_sender_test.go | 43 ++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 scheduler/batchsender/batch_sender_test.go diff --git a/scheduler/batchsender/batch_sender_test.go b/scheduler/batchsender/batch_sender_test.go new file mode 100644 index 0000000000..de89712619 --- /dev/null +++ b/scheduler/batchsender/batch_sender_test.go @@ -0,0 +1,43 @@ +package batchsender + +import ( + "sync" + "testing" + "time" +) + +// This test verifies there is no data race between Send() and the timer-triggered flush. +func TestSend_ConcurrentWithTimerFlush(t *testing.T) { + // The race occurs when: + // 1. a Send() call schedules a timer via time.AfterFunc + // 2. the timer fires and calls flush() on a separate goroutine + // 3. another Send() reads bs.items concurrently. + // + // To trigger this, we send items from multiple goroutines with delays around batchTimeout so the timer fires between Sends. + var mu sync.Mutex + var received []any + + const numGoroutines = 5 + const sendsPerGoroutine = 20 + + bs := NewBatchSender(func(items any) { + mu.Lock() + defer mu.Unlock() + received = append(received, items) + }) + + var wg sync.WaitGroup + wg.Add(numGoroutines) + for range numGoroutines { + go func() { + defer wg.Done() + for range sendsPerGoroutine { + bs.Send("item") + time.Sleep(batchTimeout + 10*time.Millisecond) + } + }() + } + + wg.Wait() + bs.Close() +} From 8b148e57c0a89928bbadc285cc429c635c6d56ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 6 Feb 2026 17:16:50 +0200 Subject: [PATCH 3/3] fix: Lint --- scheduler/batchsender/batch_sender.go | 2 +- scheduler/batchsender/batch_sender_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/scheduler/batchsender/batch_sender.go b/scheduler/batchsender/batch_sender.go index 8e0bdfb919..a851814a44 100644 --- a/scheduler/batchsender/batch_sender.go +++ b/scheduler/batchsender/batch_sender.go @@ -83,4 +83,4 @@ func (bs *BatchSender) Close() { bs.timer.Stop() } bs.flushLocked() -} \ No newline at end of file +} diff --git a/scheduler/batchsender/batch_sender_test.go b/scheduler/batchsender/batch_sender_test.go index de89712619..402cb94042 100644 --- a/scheduler/batchsender/batch_sender_test.go +++ b/scheduler/batchsender/batch_sender_test.go @@ -7,7 +7,7 @@ import ( ) // This test verifies there is no data race between Send() and the timer-triggered flush. -func TestSend_ConcurrentWithTimerFlush(t *testing.T) { +func TestSend_ConcurrentWithTimerFlush(_ *testing.T) { // The race occurs when: // 1. a Send() call schedules a timer via time.AfterFunc // 2. the timer fires and calls flush() on a separate goroutine