Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `riverlog.Middleware` now supports `MiddlewareConfig.MaxTotalBytes` (default 8 MB) to cap total persisted `river:log` history per job. When the cap is exceeded, oldest log entries are dropped first while retaining the newest entry. Values over 64 MB are clamped to 64 MB. [PR #1157](https://github.com/riverqueue/river/pull/1157).
- Improved `riverlog` performance and reduced memory amplification when appending to large persisted `river:log` histories. [PR #1157](https://github.com/riverqueue/river/pull/1157).
- Reduced snooze-path memory amplification by setting `snoozes` in metadata updates before marshaling, avoiding an extra full-payload JSON rewrite. [PR #1159](https://github.com/riverqueue/river/pull/1159).

## [0.31.0] - 2026-02-21

Expand Down
35 changes: 24 additions & 11 deletions internal/jobexecutor/job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/tidwall/gjson"
"github.com/tidwall/sjson"

"github.com/riverqueue/river/internal/execution"
"github.com/riverqueue/river/internal/hooklookup"
Expand Down Expand Up @@ -345,16 +344,17 @@ func (e *JobExecutor) invokeErrorHandler(ctx context.Context, res *jobExecutorRe
func (e *JobExecutor) reportResult(ctx context.Context, jobRow *rivertype.JobRow, res *jobExecutorResult) {
var snoozeErr *rivertype.JobSnoozeError

var (
metadataUpdatesBytes []byte
err error
)
if len(res.MetadataUpdates) > 0 {
metadataUpdatesBytes, err = json.Marshal(res.MetadataUpdates)
marshalMetadataUpdates := func(metadataUpdates map[string]any) ([]byte, error) {
if len(metadataUpdates) == 0 {
return nil, nil
}

metadataUpdatesBytes, err := json.Marshal(metadataUpdates)
if err != nil {
e.Logger.ErrorContext(ctx, e.Name+": Failed to marshal metadata updates", slog.String("error", err.Error()))
return
return nil, err
}

return metadataUpdatesBytes, nil
}

if res.Err != nil && errors.As(res.Err, &snoozeErr) {
Expand All @@ -366,9 +366,16 @@ func (e *JobExecutor) reportResult(ctx context.Context, jobRow *rivertype.JobRow
nextAttemptScheduledAt := time.Now().Add(snoozeErr.Duration)

snoozesValue := gjson.GetBytes(jobRow.Metadata, "snoozes").Int()
metadataUpdatesBytes, err = sjson.SetBytes(metadataUpdatesBytes, "snoozes", snoozesValue+1)
if res.MetadataUpdates == nil {
res.MetadataUpdates = make(map[string]any)
}
// Set snooze count in the metadata map before marshaling so we avoid
// rewriting a potentially large encoded metadata payload.
res.MetadataUpdates["snoozes"] = snoozesValue + 1

metadataUpdatesBytes, err := marshalMetadataUpdates(res.MetadataUpdates)
if err != nil {
e.Logger.ErrorContext(ctx, e.Name+": Failed to set snoozes", slog.String("error", err.Error()))
e.Logger.ErrorContext(ctx, e.Name+": Failed to marshal metadata updates", slog.String("error", err.Error()))
return
}

Expand All @@ -391,6 +398,12 @@ func (e *JobExecutor) reportResult(ctx context.Context, jobRow *rivertype.JobRow
return
}

metadataUpdatesBytes, err := marshalMetadataUpdates(res.MetadataUpdates)
if err != nil {
e.Logger.ErrorContext(ctx, e.Name+": Failed to marshal metadata updates", slog.String("error", err.Error()))
return
}

if res.Err != nil || res.PanicVal != nil {
e.reportError(ctx, jobRow, res, metadataUpdatesBytes)
return
Expand Down
121 changes: 121 additions & 0 deletions internal/jobexecutor/job_executor_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package jobexecutor

import (
"encoding/json"
"strings"
"testing"

"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)

func BenchmarkMetadataUpdatesBytesForSnooze(b *testing.B) {
if testing.Short() {
b.Skip("skipping benchmark in short mode")
}

testCases := []struct {
name string
logSizeByte int
}{
{name: "Log256KB", logSizeByte: 256 * 1024},
{name: "Log2MB", logSizeByte: 2 * 1024 * 1024},
{name: "Log8MB", logSizeByte: 8 * 1024 * 1024},
}

for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
metadataUpdates := map[string]any{
"river:log": json.RawMessage(makeRiverLogArrayWithApproxSize(tc.logSizeByte)),
}

jobMetadata := []byte(`{"snoozes":41}`)
snoozesValue := gjson.GetBytes(jobMetadata, "snoozes").Int()

b.Run("MarshalOnly", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for range b.N {
marshaled, err := json.Marshal(metadataUpdates)
if err != nil {
b.Fatal(err)
}
if len(marshaled) == 0 {
b.Fatal("expected non-empty metadata updates payload")
}
}
})

b.Run("MarshalPlusSetSnoozes", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for range b.N {
marshaled, err := json.Marshal(metadataUpdates)
if err != nil {
b.Fatal(err)
}

// Matches snooze result handling in JobExecutor.reportResult.
marshaled, err = sjson.SetBytes(marshaled, "snoozes", snoozesValue+1)
if err != nil {
b.Fatal(err)
}
if len(marshaled) == 0 {
b.Fatal("expected non-empty metadata updates payload")
}
}
})

b.Run("SetSnoozesInMapThenMarshal", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for range b.N {
// Matches the optimized reportResult path where snoozes is
// added to the map before a single marshal.
metadataUpdates["snoozes"] = snoozesValue + 1

marshaled, err := json.Marshal(metadataUpdates)
if err != nil {
b.Fatal(err)
}
if len(marshaled) == 0 {
b.Fatal("expected non-empty metadata updates payload")
}
}
})
})
}
}

type benchmarkLogAttempt struct {
Attempt int `json:"attempt"`
Log string `json:"log"`
}

func makeRiverLogArrayWithApproxSize(targetBytes int) []byte {
if targetBytes <= 0 {
return []byte("[]")
}

const perEntryLogSize = 1024
payload := strings.Repeat("x", perEntryLogSize)
numEntries := max(1, targetBytes/perEntryLogSize)

logs := make([]benchmarkLogAttempt, numEntries)
for i := range numEntries {
logs[i] = benchmarkLogAttempt{
Attempt: i + 1,
Log: payload,
}
}

arrayBytes, err := json.Marshal(logs)
if err != nil {
panic(err)
}

return arrayBytes
}
177 changes: 177 additions & 0 deletions riverdriver/riverdrivertest/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package riverdrivertest

import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -134,4 +137,178 @@ func Benchmark[TTx any](ctx context.Context, b *testing.B,
}
}
})

b.Run("JobSetStateIfRunningMany_LargeMetadata", func(b *testing.B) {
if testing.Short() {
b.Skip("skipping benchmark in short mode")
}

exec, schema := setup(ctx, b)

testCases := []struct {
name string
metadataSizeBytes int
}{
{name: "Metadata2MB", metadataSizeBytes: 2 * 1024 * 1024},
{name: "Metadata8MB", metadataSizeBytes: 8 * 1024 * 1024},
}

for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
largeMetadata := makeBenchmarkMetadataWithRiverLogSize(tc.metadataSizeBytes)
stateRunning := rivertype.JobStateRunning

insertedJobs, err := exec.JobInsertFullMany(ctx, &riverdriver.JobInsertFullManyParams{
Jobs: []*riverdriver.JobInsertFullParams{
testfactory.Job_Build(b, &testfactory.JobOpts{
Metadata: largeMetadata,
State: &stateRunning,
}),
},
Schema: schema,
})
if err != nil {
b.Fatalf("failed to insert benchmark job: %v", err)
}
if len(insertedJobs) != 1 {
b.Fatalf("expected exactly one inserted job, got %d", len(insertedJobs))
}

now := time.Now().UTC()
params := &riverdriver.JobSetStateIfRunningManyParams{
ID: []int64{insertedJobs[0].ID},
Attempt: []*int{nil},
ErrData: [][]byte{nil},
FinalizedAt: []*time.Time{nil},
MetadataDoMerge: []bool{true},
MetadataUpdates: [][]byte{largeMetadata},
ScheduledAt: []*time.Time{&now},
Schema: schema,
State: []rivertype.JobState{rivertype.JobStateScheduled},
}

b.ReportAllocs()
b.ResetTimer()

for range b.N {
if _, err := exec.JobSetStateIfRunningMany(ctx, params); err != nil {
b.Fatalf("failed to update benchmark job: %v", err)
}
}
})
}
})

b.Run("JobGetAvailable_LargeMetadata", func(b *testing.B) {
if testing.Short() {
b.Skip("skipping benchmark in short mode")
}

exec, schema := setup(ctx, b)

testCases := []struct {
name string
metadataSizeBytes int
}{
{name: "Metadata2MB", metadataSizeBytes: 2 * 1024 * 1024},
{name: "Metadata8MB", metadataSizeBytes: 8 * 1024 * 1024},
}

for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
largeMetadata := makeBenchmarkMetadataWithRiverLogSize(tc.metadataSizeBytes)
now := time.Now().UTC()

insertedJobs, err := exec.JobInsertFullMany(ctx, &riverdriver.JobInsertFullManyParams{
Jobs: []*riverdriver.JobInsertFullParams{
testfactory.Job_Build(b, &testfactory.JobOpts{
Metadata: largeMetadata,
ScheduledAt: &now,
}),
},
Schema: schema,
})
if err != nil {
b.Fatalf("failed to insert benchmark job: %v", err)
}
if len(insertedJobs) != 1 {
b.Fatalf("expected exactly one inserted job, got %d", len(insertedJobs))
}

schemaPrefix := ""
if schema != "" {
schemaPrefix = schema + "."
}
resetSQL := fmt.Sprintf(
"UPDATE %sriver_job SET state = 'available', attempt = 0 WHERE id = %d",
schemaPrefix,
insertedJobs[0].ID,
)

getAvailableParams := &riverdriver.JobGetAvailableParams{
ClientID: "bench-client-id",
MaxAttemptedBy: 100,
MaxToLock: 1,
Now: &now,
Queue: insertedJobs[0].Queue,
Schema: schema,
}

b.ReportAllocs()
b.ResetTimer()

for range b.N {
jobs, err := exec.JobGetAvailable(ctx, getAvailableParams)
if err != nil {
b.Fatalf("failed to fetch benchmark job: %v", err)
}
if len(jobs) != 1 {
b.Fatalf("expected exactly one fetched job, got %d", len(jobs))
}

if len(jobs[0].Metadata) == 0 {
b.Fatal("expected non-empty job metadata")
}

// Reset job state for the next benchmark iteration without
// using a RETURNING query that would read metadata again.
if err := exec.Exec(ctx, resetSQL); err != nil {
b.Fatalf("failed to reset benchmark job: %v", err)
}
}
})
}
})
}

type benchmarkLogAttempt struct {
Attempt int `json:"attempt"`
Log string `json:"log"`
}

func makeBenchmarkMetadataWithRiverLogSize(targetBytes int) []byte {
if targetBytes <= 0 {
return []byte(`{}`)
}

const perEntryLogSize = 1024
payload := strings.Repeat("x", perEntryLogSize)
numEntries := max(1, targetBytes/perEntryLogSize)

logs := make([]benchmarkLogAttempt, numEntries)
for i := range numEntries {
logs[i] = benchmarkLogAttempt{
Attempt: i + 1,
Log: payload,
}
}

metadataBytes, err := json.Marshal(map[string]any{
"river:log": logs,
})
if err != nil {
panic(err)
}

return metadataBytes
}