diff --git a/scheduler/batchsender/batch_sender.go b/scheduler/batchsender/batch_sender.go index 941b6aef83..a851814a44 100644 --- a/scheduler/batchsender/batch_sender.go +++ b/scheduler/batchsender/batch_sender.go @@ -19,10 +19,10 @@ const ( // - If the current batch has reached the batch size, it will be sent immediately // - Otherwise, a timer will be started to send the current batch after the batch timeout type BatchSender struct { - sendFn func(any) - items []any - timer *time.Timer - itemsLock sync.Mutex + sendFn func(any) + items []any + timer *time.Timer + mu sync.Mutex } func NewBatchSender(sendFn func(any)) *BatchSender { @@ -30,6 +30,9 @@ func NewBatchSender(sendFn func(any)) *BatchSender { } func (bs *BatchSender) Send(item any) { + bs.mu.Lock() + defer bs.mu.Unlock() + if bs.timer != nil { bs.timer.Stop() } @@ -39,34 +42,29 @@ func (bs *BatchSender) Send(item any) { // If item is already a slice, send it directly // together with the current batch if len(items) > 1 { - bs.flush(items...) + bs.flushLocked(items...) return } // Otherwise, add item to the current batch - bs.appendToBatch(items...) + bs.items = append(bs.items, items...) // If the current batch has reached the batch size, send it if len(bs.items) >= batchSize { - bs.flush() + bs.flushLocked() return } // Otherwise, start a timer to send the current batch after the batch timeout - bs.timer = time.AfterFunc(batchTimeout, func() { bs.flush() }) -} - -func (bs *BatchSender) appendToBatch(items ...any) { - bs.itemsLock.Lock() - defer bs.itemsLock.Unlock() - - bs.items = append(bs.items, items...) + bs.timer = time.AfterFunc(batchTimeout, func() { + bs.mu.Lock() + defer bs.mu.Unlock() + bs.flushLocked() + }) } -func (bs *BatchSender) flush(items ...any) { - bs.itemsLock.Lock() - defer bs.itemsLock.Unlock() - +// flushLocked sends all buffered items. Must be called with bs.mu held. +func (bs *BatchSender) flushLocked(items ...any) { bs.items = append(bs.items, items...) if len(bs.items) == 0 { @@ -78,8 +76,11 @@ func (bs *BatchSender) flush(items ...any) { } func (bs *BatchSender) Close() { + bs.mu.Lock() + defer bs.mu.Unlock() + if bs.timer != nil { bs.timer.Stop() } - bs.flush() + bs.flushLocked() } diff --git a/scheduler/batchsender/batch_sender_test.go b/scheduler/batchsender/batch_sender_test.go new file mode 100644 index 0000000000..402cb94042 --- /dev/null +++ b/scheduler/batchsender/batch_sender_test.go @@ -0,0 +1,43 @@ +package batchsender + +import ( + "sync" + "testing" + "time" +) + +// This test verifies there is no data race between Send() and the timer-triggered flush. +func TestSend_ConcurrentWithTimerFlush(_ *testing.T) { + // The race occurs when: + // 1. a Send() call schedules a timer via time.AfterFunc + // 2. the timer fires and calls flush() on a separate goroutine + // 3. another Send() reads bs.items concurrently. + // + // To trigger this, we send items from multiple goroutines with delays around batchTimeout so the timer fires between Sends. + var mu sync.Mutex + var received []any + + const numGoroutines = 5 + const sendsPerGoroutine = 20 + + bs := NewBatchSender(func(items any) { + mu.Lock() + defer mu.Unlock() + received = append(received, items) + }) + + var wg sync.WaitGroup + wg.Add(numGoroutines) + for range numGoroutines { + go func() { + defer wg.Done() + for range sendsPerGoroutine { + bs.Send("item") + time.Sleep(batchTimeout + 10*time.Millisecond) + } + }() + } + + wg.Wait() + bs.Close() +}