Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions scheduler/batchsender/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@ const (
// - If the current batch has reached the batch size, it will be sent immediately
// - Otherwise, a timer will be started to send the current batch after the batch timeout
type BatchSender struct {
sendFn func(any)
items []any
timer *time.Timer
itemsLock sync.Mutex
sendFn func(any)
items []any
timer *time.Timer
mu sync.Mutex
}

func NewBatchSender(sendFn func(any)) *BatchSender {
return &BatchSender{sendFn: sendFn}
}

func (bs *BatchSender) Send(item any) {
bs.mu.Lock()
defer bs.mu.Unlock()

if bs.timer != nil {
bs.timer.Stop()
}
Expand All @@ -39,34 +42,29 @@ func (bs *BatchSender) Send(item any) {
// If item is already a slice, send it directly
// together with the current batch
if len(items) > 1 {
bs.flush(items...)
bs.flushLocked(items...)
return
}

// Otherwise, add item to the current batch
bs.appendToBatch(items...)
bs.items = append(bs.items, items...)

// If the current batch has reached the batch size, send it
if len(bs.items) >= batchSize {
bs.flush()
bs.flushLocked()
return
}

// Otherwise, start a timer to send the current batch after the batch timeout
bs.timer = time.AfterFunc(batchTimeout, func() { bs.flush() })
}

func (bs *BatchSender) appendToBatch(items ...any) {
bs.itemsLock.Lock()
defer bs.itemsLock.Unlock()

bs.items = append(bs.items, items...)
bs.timer = time.AfterFunc(batchTimeout, func() {
bs.mu.Lock()
defer bs.mu.Unlock()
bs.flushLocked()
})
}

func (bs *BatchSender) flush(items ...any) {
bs.itemsLock.Lock()
defer bs.itemsLock.Unlock()

// flushLocked sends all buffered items. Must be called with bs.mu held.
func (bs *BatchSender) flushLocked(items ...any) {
bs.items = append(bs.items, items...)

if len(bs.items) == 0 {
Expand All @@ -78,8 +76,11 @@ func (bs *BatchSender) flush(items ...any) {
}

func (bs *BatchSender) Close() {
bs.mu.Lock()
defer bs.mu.Unlock()

if bs.timer != nil {
bs.timer.Stop()
}
bs.flush()
bs.flushLocked()
}
43 changes: 43 additions & 0 deletions scheduler/batchsender/batch_sender_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package batchsender

import (
"sync"
"testing"
"time"
)

// This test verifies there is no data race between Send() and the timer-triggered flush.
func TestSend_ConcurrentWithTimerFlush(_ *testing.T) {
// The race occurs when:
// 1. a Send() call schedules a timer via time.AfterFunc
// 2. the timer fires and calls flush() on a separate goroutine
// 3. another Send() reads bs.items concurrently.
//
// To trigger this, we send items from multiple goroutines with delays around batchTimeout so the timer fires between Sends.
var mu sync.Mutex
var received []any

const numGoroutines = 5
const sendsPerGoroutine = 20

bs := NewBatchSender(func(items any) {
mu.Lock()
defer mu.Unlock()
received = append(received, items)
})

var wg sync.WaitGroup
wg.Add(numGoroutines)
for range numGoroutines {
go func() {
defer wg.Done()
for range sendsPerGoroutine {
bs.Send("item")
time.Sleep(batchTimeout + 10*time.Millisecond)
}
}()
}

wg.Wait()
bs.Close()
}
Comment on lines +9 to +43
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test verifies there is no data race but doesn't verify correctness of behavior. Consider adding assertions to verify that all items were received and that the batching behavior is correct. For example, you could check the total number of items received, or verify that batch sizes are appropriate. This is particularly important since the test uses synchronization primitives (mutex around received slice) but doesn't validate the data itself.

Copilot uses AI. Check for mistakes.