Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- `riverlog.Middleware` now supports `MiddlewareConfig.MaxTotalBytes` (default 8 MB) to cap total persisted `river:log` history per job. When the cap is exceeded, oldest log entries are dropped first while retaining the newest entry. Values over 64 MB are clamped to 64 MB. [PR #1157](https://github.com/riverqueue/river/pull/1157).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brandur I should have called this out earlier, does this 8MB default seem reasonable to you? Do you feel it should be called out as "breaking" or is "changed" sufficient?

We can tweak it in a follow up before release if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I saw it in there so I can't complain. I could see it potentially on the low side, but it really depends on the use case. How did you pick 8 MB anyway?

Just re-reading this, we might want to update this comment on MaxSizeBytes and call out that you'll likely hit MaxTotalBytes way before you hit something like 50 MB:

	// Be careful with this number because the maximum total log size is equal
	// to maximum number of attempts multiplied by this number (each attempt's
	// logs are kept separately). For example, 25 * 2 MB = 50 MB maximum
	// theoretical log size. Log data goes into metadata which is a JSONB field,
	// and JSONB fields have a maximum size of 255 MB, so any number larger than
	// 255 divided by maximum number of attempts may cause serious operational
	// problems.

- Improved `riverlog` performance and reduced memory amplification when appending to large persisted `river:log` histories. [PR #1157](https://github.com/riverqueue/river/pull/1157).

## [0.31.0] - 2026-02-21

### Added
Expand Down
188 changes: 169 additions & 19 deletions riverlog/river_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,26 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"

"github.com/tidwall/gjson"

"github.com/riverqueue/river/internal/jobexecutor"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivertype"
)

const (
maxSizeMB = 2
maxSizeBytes = maxSizeMB * 1024 * 1024
metadataKey = "river:log"
maxSizeMB = 2
maxSizeBytes = maxSizeMB * 1024 * 1024
maxTotalSizeMB = 8
maxTotalBytes = maxTotalSizeMB * 1024 * 1024
// Hard ceiling to prevent pathological allocations from extreme configs.
maxTotalSizeMaxMB = 64
maxTotalBytesMax = maxTotalSizeMaxMB * 1024 * 1024
metadataKey = "river:log"
)

type contextKey struct{}
Expand Down Expand Up @@ -77,6 +85,16 @@ type MiddlewareConfig struct {
//
// Defaults to 2 MB (which is per job attempt).
MaxSizeBytes int

// MaxTotalBytes is the maximum total size of all persisted river logs for a
// job attempt history. If appending the latest attempt would exceed this
// size, oldest log entries are dropped first.
//
// The latest entry is always retained, even if doing so means the resulting
// payload exceeds MaxTotalBytes.
//
// Defaults to 8 MB. Values larger than 64 MB are clamped to 64 MB.
MaxTotalBytes int
}

// NewMiddleware initializes a new Middleware with the given slog handler
Expand Down Expand Up @@ -136,6 +154,7 @@ func defaultConfig(config *MiddlewareConfig) *MiddlewareConfig {
}

config.MaxSizeBytes = cmp.Or(config.MaxSizeBytes, maxSizeBytes)
config.MaxTotalBytes = min(cmp.Or(config.MaxTotalBytes, maxTotalBytes), maxTotalBytesMax)

return config
}
Expand All @@ -150,10 +169,7 @@ type metadataWithLog struct {
}

func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error {
var (
existingLogData metadataWithLog
logBuf bytes.Buffer
)
var logBuf bytes.Buffer

switch {
case m.newCustomContext != nil:
Expand All @@ -165,46 +181,180 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
return errors.New("expected either newContextLogger or newSlogHandler to be set")
}

if err := json.Unmarshal(job.Metadata, &existingLogData); err != nil {
return err
}

metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx)
if !hasMetadataUpdates {
return errors.New("expected to find metadata updates in context, but didn't")
}

// This all runs invariant of whether the job panics or returns an error.
defer func() {
logData := logBuf.String()
logBytes := logBuf.Bytes()

// Return early if nothing ended up getting logged.
if len(logData) < 1 {
if len(logBytes) < 1 {
return
}

// Postgres JSONB is limited to 255MB, but it would be a bad idea to get
// anywhere close to that limit here.
if len(logData) > m.config.MaxSizeBytes {
if len(logBytes) > m.config.MaxSizeBytes {
m.Logger.WarnContext(ctx, m.Name+": Logs size exceeded maximum; truncating",
slog.Int("logs_size", len(logData)),
slog.Int("logs_size", len(logBytes)),
slog.Int("max_size", m.config.MaxSizeBytes),
)
logData = logData[0:m.config.MaxSizeBytes]
logBytes = logBytes[0:m.config.MaxSizeBytes]
}

allLogDataBytes, err := json.Marshal(append(existingLogData.RiverLog, logAttempt{
newLogEntryBytes, err := json.Marshal(logAttempt{
Attempt: job.Attempt,
Log: logData,
}))
Log: string(logBytes),
})
if err != nil {
m.Logger.ErrorContext(ctx, m.Name+": Error marshaling log data",
slog.Any("error", err),
)
return
}

allLogDataBytes, numDroppedEntries, err := appendLogDataWithCap(job.Metadata, newLogEntryBytes, m.config.MaxTotalBytes)
if err != nil {
m.Logger.ErrorContext(ctx, m.Name+": Error marshaling log data",
slog.Any("error", err),
)
return
}

if numDroppedEntries > 0 {
m.Logger.WarnContext(ctx, m.Name+": Logs size exceeded total maximum; dropping oldest entries",
slog.Int("max_total_size", m.config.MaxTotalBytes),
slog.Int("num_entries_dropped", numDroppedEntries),
)
}

metadataUpdates[metadataKey] = json.RawMessage(allLogDataBytes)
}()

return doInner(ctx)
}

func appendLogDataWithCap(metadataBytes, newLogEntryBytes []byte, maxTotalBytes int) ([]byte, int, error) {
existingLogData := gjson.GetBytes(metadataBytes, metadataKey)
var existingLogArrayBytes []byte
switch {
case !existingLogData.Exists():
existingLogArrayBytes = []byte("[]")
case existingLogData.IsArray():
// Slice raw JSON straight from metadata bytes to avoid an extra copy.
existingLogArrayBytes = metadataBytes[existingLogData.Index : existingLogData.Index+len(existingLogData.Raw)]
default:
return nil, 0, fmt.Errorf("%q value is not an array", metadataKey)
}

existingElementBounds, err := getArrayElementBounds(existingLogArrayBytes)
if err != nil {
return nil, 0, err
}

// Determine the smallest suffix to keep that still fits with the new entry.
// This keeps pruning oldest-first while avoiding repeated full rewrites.
keepStart := getKeepStart(existingElementBounds, len(newLogEntryBytes), maxTotalBytes)

// Build the final array once from the kept suffix plus the new entry.
appendedLogDataBytes := buildAppendedArray(existingLogArrayBytes, existingElementBounds, keepStart, newLogEntryBytes)
numDroppedEntries := min(keepStart, len(existingElementBounds))

return appendedLogDataBytes, numDroppedEntries, nil
}

type arrayElementBounds struct {
Start int
End int
}

func getArrayElementBounds(arrayBytes []byte) ([]arrayElementBounds, error) {
arrResult := gjson.ParseBytes(arrayBytes)
if !arrResult.IsArray() {
return nil, errors.New("expected a JSON array")
}

elements := arrResult.Array()
bounds := make([]arrayElementBounds, len(elements))
for i, elem := range elements {
if elem.Index < 0 {
return nil, errors.New("failed to determine array element index")
}
bounds[i] = arrayElementBounds{
Start: elem.Index,
End: elem.Index + len(elem.Raw),
}
}
return bounds, nil
}

func getKeepStart(bounds []arrayElementBounds, newEntryLen, maxTotalBytes int) int {
if maxTotalBytes <= 0 {
return 0
}

// Keep newest entry even if it's larger than the configured cap.
newOnlyLen := 2 + newEntryLen // `[` + entry + `]`
if newOnlyLen > maxTotalBytes {
return len(bounds)
}

if len(bounds) == 0 {
return 0
}

lastEnd := bounds[len(bounds)-1].End

// Iterate from oldest to newest so we drop the minimum number of entries
// necessary to fit the configured cap.
for keepStart := 0; keepStart <= len(bounds); keepStart++ {
contentLen := newEntryLen
if keepStart < len(bounds) {
// Use actual byte offsets from parsed elements so separator
// whitespace is fully counted toward the cap.
keptContentLen := lastEnd - bounds[keepStart].Start
contentLen += 1 + keptContentLen // comma between kept suffix and new entry
}
totalLen := 2 + contentLen // `[` + content + `]`
if totalLen <= maxTotalBytes {
return keepStart
}
}

return len(bounds)
}

func buildAppendedArray(existingArrayBytes []byte, bounds []arrayElementBounds, keepStart int, newEntryBytes []byte) []byte {
totalLen := 0
maxInt := int(^uint(0) >> 1)

// Preallocation is an optimization only. If length math would overflow,
// fall back to zero-capacity and let append grow as needed.
if keepStart >= len(bounds) {
if len(newEntryBytes) <= maxInt-2 {
totalLen = 2 + len(newEntryBytes)
}
} else {
// Kept suffix is contiguous in the original array bytes, so copy once.
keptContentLen := bounds[len(bounds)-1].End - bounds[keepStart].Start
if keptContentLen >= 0 && len(newEntryBytes) <= maxInt-3 && keptContentLen <= maxInt-3-len(newEntryBytes) {
totalLen = 3 + keptContentLen + len(newEntryBytes) // [] + comma + new entry
}
}

result := make([]byte, 0, totalLen)
result = append(result, '[')

if keepStart < len(bounds) {
result = append(result, existingArrayBytes[bounds[keepStart].Start:bounds[len(bounds)-1].End]...)
result = append(result, ',')
}

result = append(result, newEntryBytes...)
result = append(result, ']')

return result
}
86 changes: 86 additions & 0 deletions riverlog/river_log_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package riverlog

import (
"context"
"encoding/json"
"io"
"log/slog"
"strings"
"testing"

"github.com/riverqueue/river/internal/jobexecutor"
"github.com/riverqueue/river/rivertype"
)

func BenchmarkMiddlewareWorkAppend(b *testing.B) {
if testing.Short() {
b.Skip("skipping benchmark in short mode")
}

middleware := NewMiddleware(func(w io.Writer) slog.Handler {
return slog.NewTextHandler(w, nil)
}, nil)

cases := []struct {
name string
size int
}{
{name: "metadata_256kb", size: 256 * 1024},
{name: "metadata_2mb", size: 2 * 1024 * 1024},
}

for _, tc := range cases {
b.Run(tc.name, func(b *testing.B) {
metadata := makeMetadataWithLogSize(tc.size)
appendLogLine := strings.Repeat("x", 2048)

b.ReportAllocs()
b.ResetTimer()

for range b.N {
metadataUpdates := map[string]any{}
ctx := context.WithValue(context.Background(), jobexecutor.ContextKeyMetadataUpdates, metadataUpdates)
job := &rivertype.JobRow{
Attempt: 1,
Metadata: metadata,
}

if err := middleware.Work(ctx, job, func(ctx context.Context) error {
Logger(ctx).InfoContext(ctx, appendLogLine)
return nil
}); err != nil {
b.Fatalf("work returned error: %v", err)
}

if _, ok := metadataUpdates[metadataKey]; !ok {
b.Fatal("missing river:log metadata update")
}
}
})
}
}

func makeMetadataWithLogSize(targetBytes int) []byte {
if targetBytes <= 0 {
return []byte(`{}`)
}

const perEntryLogSize = 1024
payload := strings.Repeat("x", perEntryLogSize)
numEntries := max(1, targetBytes/perEntryLogSize)

logs := make([]logAttempt, numEntries)
for i := range numEntries {
logs[i] = logAttempt{
Attempt: i + 1,
Log: payload,
}
}

metadataBytes, err := json.Marshal(metadataWithLog{RiverLog: logs})
if err != nil {
panic(err)
}

return metadataBytes
}
Loading
Loading