From 9f8274622b1fd77132450a385fca69e599a1c180 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Wed, 25 Feb 2026 09:07:49 -0600 Subject: [PATCH] optimize snooze metadata updates and add benches Snoozed jobs with large `river:log` payloads were still paying an extra full-blob rewrite in `jobexecutor.reportResult`. The code marshaled `MetadataUpdates` and then used `sjson.SetBytes` to increment `snoozes`, which re-encoded the entire JSON payload. Change snooze handling to set `snoozes` directly in the `res.MetadataUpdates` map before marshaling. This keeps behavior the same while avoiding the second large copy. The method now funnels marshal logic through a small helper used by both snooze and non-snooze paths. Add focused benchmarks for this hot path, including 256 KB, 2 MB, and 8 MB `river:log` cases with both old-style and optimized approaches. Extend driver benchmarks with large metadata cases for `JobSetStateIfRunningMany` and `JobGetAvailable` to track driver-side amplification in fetch and update paths. --- CHANGELOG.md | 1 + internal/jobexecutor/job_executor.go | 35 ++-- .../job_executor_benchmark_test.go | 121 ++++++++++++ riverdriver/riverdrivertest/benchmark.go | 177 ++++++++++++++++++ 4 files changed, 323 insertions(+), 11 deletions(-) create mode 100644 internal/jobexecutor/job_executor_benchmark_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 9132766c..e6f5a1e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `riverlog.Middleware` now supports `MiddlewareConfig.MaxTotalBytes` (default 8 MB) to cap total persisted `river:log` history per job. When the cap is exceeded, oldest log entries are dropped first while retaining the newest entry. Values over 64 MB are clamped to 64 MB. [PR #1157](https://github.com/riverqueue/river/pull/1157). - Improved `riverlog` performance and reduced memory amplification when appending to large persisted `river:log` histories. [PR #1157](https://github.com/riverqueue/river/pull/1157). +- Reduced snooze-path memory amplification by setting `snoozes` in metadata updates before marshaling, avoiding an extra full-payload JSON rewrite. [PR #1159](https://github.com/riverqueue/river/pull/1159). ## [0.31.0] - 2026-02-21 diff --git a/internal/jobexecutor/job_executor.go b/internal/jobexecutor/job_executor.go index 588902fa..d3bd5503 100644 --- a/internal/jobexecutor/job_executor.go +++ b/internal/jobexecutor/job_executor.go @@ -12,7 +12,6 @@ import ( "time" "github.com/tidwall/gjson" - "github.com/tidwall/sjson" "github.com/riverqueue/river/internal/execution" "github.com/riverqueue/river/internal/hooklookup" @@ -345,16 +344,17 @@ func (e *JobExecutor) invokeErrorHandler(ctx context.Context, res *jobExecutorRe func (e *JobExecutor) reportResult(ctx context.Context, jobRow *rivertype.JobRow, res *jobExecutorResult) { var snoozeErr *rivertype.JobSnoozeError - var ( - metadataUpdatesBytes []byte - err error - ) - if len(res.MetadataUpdates) > 0 { - metadataUpdatesBytes, err = json.Marshal(res.MetadataUpdates) + marshalMetadataUpdates := func(metadataUpdates map[string]any) ([]byte, error) { + if len(metadataUpdates) == 0 { + return nil, nil + } + + metadataUpdatesBytes, err := json.Marshal(metadataUpdates) if err != nil { - e.Logger.ErrorContext(ctx, e.Name+": Failed to marshal metadata updates", slog.String("error", err.Error())) - return + return nil, err } + + return metadataUpdatesBytes, nil } if res.Err != nil && errors.As(res.Err, &snoozeErr) { @@ -366,9 +366,16 @@ func (e *JobExecutor) reportResult(ctx context.Context, jobRow *rivertype.JobRow nextAttemptScheduledAt := time.Now().Add(snoozeErr.Duration) snoozesValue := gjson.GetBytes(jobRow.Metadata, "snoozes").Int() - metadataUpdatesBytes, err = sjson.SetBytes(metadataUpdatesBytes, "snoozes", snoozesValue+1) + if res.MetadataUpdates == nil { + res.MetadataUpdates = make(map[string]any) + } + // Set snooze count in the metadata map before marshaling so we avoid + // rewriting a potentially large encoded metadata payload. + res.MetadataUpdates["snoozes"] = snoozesValue + 1 + + metadataUpdatesBytes, err := marshalMetadataUpdates(res.MetadataUpdates) if err != nil { - e.Logger.ErrorContext(ctx, e.Name+": Failed to set snoozes", slog.String("error", err.Error())) + e.Logger.ErrorContext(ctx, e.Name+": Failed to marshal metadata updates", slog.String("error", err.Error())) return } @@ -391,6 +398,12 @@ func (e *JobExecutor) reportResult(ctx context.Context, jobRow *rivertype.JobRow return } + metadataUpdatesBytes, err := marshalMetadataUpdates(res.MetadataUpdates) + if err != nil { + e.Logger.ErrorContext(ctx, e.Name+": Failed to marshal metadata updates", slog.String("error", err.Error())) + return + } + if res.Err != nil || res.PanicVal != nil { e.reportError(ctx, jobRow, res, metadataUpdatesBytes) return diff --git a/internal/jobexecutor/job_executor_benchmark_test.go b/internal/jobexecutor/job_executor_benchmark_test.go new file mode 100644 index 00000000..29b3d282 --- /dev/null +++ b/internal/jobexecutor/job_executor_benchmark_test.go @@ -0,0 +1,121 @@ +package jobexecutor + +import ( + "encoding/json" + "strings" + "testing" + + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" +) + +func BenchmarkMetadataUpdatesBytesForSnooze(b *testing.B) { + if testing.Short() { + b.Skip("skipping benchmark in short mode") + } + + testCases := []struct { + name string + logSizeByte int + }{ + {name: "Log256KB", logSizeByte: 256 * 1024}, + {name: "Log2MB", logSizeByte: 2 * 1024 * 1024}, + {name: "Log8MB", logSizeByte: 8 * 1024 * 1024}, + } + + for _, tc := range testCases { + b.Run(tc.name, func(b *testing.B) { + metadataUpdates := map[string]any{ + "river:log": json.RawMessage(makeRiverLogArrayWithApproxSize(tc.logSizeByte)), + } + + jobMetadata := []byte(`{"snoozes":41}`) + snoozesValue := gjson.GetBytes(jobMetadata, "snoozes").Int() + + b.Run("MarshalOnly", func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for range b.N { + marshaled, err := json.Marshal(metadataUpdates) + if err != nil { + b.Fatal(err) + } + if len(marshaled) == 0 { + b.Fatal("expected non-empty metadata updates payload") + } + } + }) + + b.Run("MarshalPlusSetSnoozes", func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for range b.N { + marshaled, err := json.Marshal(metadataUpdates) + if err != nil { + b.Fatal(err) + } + + // Matches snooze result handling in JobExecutor.reportResult. + marshaled, err = sjson.SetBytes(marshaled, "snoozes", snoozesValue+1) + if err != nil { + b.Fatal(err) + } + if len(marshaled) == 0 { + b.Fatal("expected non-empty metadata updates payload") + } + } + }) + + b.Run("SetSnoozesInMapThenMarshal", func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for range b.N { + // Matches the optimized reportResult path where snoozes is + // added to the map before a single marshal. + metadataUpdates["snoozes"] = snoozesValue + 1 + + marshaled, err := json.Marshal(metadataUpdates) + if err != nil { + b.Fatal(err) + } + if len(marshaled) == 0 { + b.Fatal("expected non-empty metadata updates payload") + } + } + }) + }) + } +} + +type benchmarkLogAttempt struct { + Attempt int `json:"attempt"` + Log string `json:"log"` +} + +func makeRiverLogArrayWithApproxSize(targetBytes int) []byte { + if targetBytes <= 0 { + return []byte("[]") + } + + const perEntryLogSize = 1024 + payload := strings.Repeat("x", perEntryLogSize) + numEntries := max(1, targetBytes/perEntryLogSize) + + logs := make([]benchmarkLogAttempt, numEntries) + for i := range numEntries { + logs[i] = benchmarkLogAttempt{ + Attempt: i + 1, + Log: payload, + } + } + + arrayBytes, err := json.Marshal(logs) + if err != nil { + panic(err) + } + + return arrayBytes +} diff --git a/riverdriver/riverdrivertest/benchmark.go b/riverdriver/riverdrivertest/benchmark.go index d18b2cdb..49bb8f23 100644 --- a/riverdriver/riverdrivertest/benchmark.go +++ b/riverdriver/riverdrivertest/benchmark.go @@ -2,6 +2,9 @@ package riverdrivertest import ( "context" + "encoding/json" + "fmt" + "strings" "testing" "time" @@ -134,4 +137,178 @@ func Benchmark[TTx any](ctx context.Context, b *testing.B, } } }) + + b.Run("JobSetStateIfRunningMany_LargeMetadata", func(b *testing.B) { + if testing.Short() { + b.Skip("skipping benchmark in short mode") + } + + exec, schema := setup(ctx, b) + + testCases := []struct { + name string + metadataSizeBytes int + }{ + {name: "Metadata2MB", metadataSizeBytes: 2 * 1024 * 1024}, + {name: "Metadata8MB", metadataSizeBytes: 8 * 1024 * 1024}, + } + + for _, tc := range testCases { + b.Run(tc.name, func(b *testing.B) { + largeMetadata := makeBenchmarkMetadataWithRiverLogSize(tc.metadataSizeBytes) + stateRunning := rivertype.JobStateRunning + + insertedJobs, err := exec.JobInsertFullMany(ctx, &riverdriver.JobInsertFullManyParams{ + Jobs: []*riverdriver.JobInsertFullParams{ + testfactory.Job_Build(b, &testfactory.JobOpts{ + Metadata: largeMetadata, + State: &stateRunning, + }), + }, + Schema: schema, + }) + if err != nil { + b.Fatalf("failed to insert benchmark job: %v", err) + } + if len(insertedJobs) != 1 { + b.Fatalf("expected exactly one inserted job, got %d", len(insertedJobs)) + } + + now := time.Now().UTC() + params := &riverdriver.JobSetStateIfRunningManyParams{ + ID: []int64{insertedJobs[0].ID}, + Attempt: []*int{nil}, + ErrData: [][]byte{nil}, + FinalizedAt: []*time.Time{nil}, + MetadataDoMerge: []bool{true}, + MetadataUpdates: [][]byte{largeMetadata}, + ScheduledAt: []*time.Time{&now}, + Schema: schema, + State: []rivertype.JobState{rivertype.JobStateScheduled}, + } + + b.ReportAllocs() + b.ResetTimer() + + for range b.N { + if _, err := exec.JobSetStateIfRunningMany(ctx, params); err != nil { + b.Fatalf("failed to update benchmark job: %v", err) + } + } + }) + } + }) + + b.Run("JobGetAvailable_LargeMetadata", func(b *testing.B) { + if testing.Short() { + b.Skip("skipping benchmark in short mode") + } + + exec, schema := setup(ctx, b) + + testCases := []struct { + name string + metadataSizeBytes int + }{ + {name: "Metadata2MB", metadataSizeBytes: 2 * 1024 * 1024}, + {name: "Metadata8MB", metadataSizeBytes: 8 * 1024 * 1024}, + } + + for _, tc := range testCases { + b.Run(tc.name, func(b *testing.B) { + largeMetadata := makeBenchmarkMetadataWithRiverLogSize(tc.metadataSizeBytes) + now := time.Now().UTC() + + insertedJobs, err := exec.JobInsertFullMany(ctx, &riverdriver.JobInsertFullManyParams{ + Jobs: []*riverdriver.JobInsertFullParams{ + testfactory.Job_Build(b, &testfactory.JobOpts{ + Metadata: largeMetadata, + ScheduledAt: &now, + }), + }, + Schema: schema, + }) + if err != nil { + b.Fatalf("failed to insert benchmark job: %v", err) + } + if len(insertedJobs) != 1 { + b.Fatalf("expected exactly one inserted job, got %d", len(insertedJobs)) + } + + schemaPrefix := "" + if schema != "" { + schemaPrefix = schema + "." + } + resetSQL := fmt.Sprintf( + "UPDATE %sriver_job SET state = 'available', attempt = 0 WHERE id = %d", + schemaPrefix, + insertedJobs[0].ID, + ) + + getAvailableParams := &riverdriver.JobGetAvailableParams{ + ClientID: "bench-client-id", + MaxAttemptedBy: 100, + MaxToLock: 1, + Now: &now, + Queue: insertedJobs[0].Queue, + Schema: schema, + } + + b.ReportAllocs() + b.ResetTimer() + + for range b.N { + jobs, err := exec.JobGetAvailable(ctx, getAvailableParams) + if err != nil { + b.Fatalf("failed to fetch benchmark job: %v", err) + } + if len(jobs) != 1 { + b.Fatalf("expected exactly one fetched job, got %d", len(jobs)) + } + + if len(jobs[0].Metadata) == 0 { + b.Fatal("expected non-empty job metadata") + } + + // Reset job state for the next benchmark iteration without + // using a RETURNING query that would read metadata again. + if err := exec.Exec(ctx, resetSQL); err != nil { + b.Fatalf("failed to reset benchmark job: %v", err) + } + } + }) + } + }) +} + +type benchmarkLogAttempt struct { + Attempt int `json:"attempt"` + Log string `json:"log"` +} + +func makeBenchmarkMetadataWithRiverLogSize(targetBytes int) []byte { + if targetBytes <= 0 { + return []byte(`{}`) + } + + const perEntryLogSize = 1024 + payload := strings.Repeat("x", perEntryLogSize) + numEntries := max(1, targetBytes/perEntryLogSize) + + logs := make([]benchmarkLogAttempt, numEntries) + for i := range numEntries { + logs[i] = benchmarkLogAttempt{ + Attempt: i + 1, + Log: payload, + } + } + + metadataBytes, err := json.Marshal(map[string]any{ + "river:log": logs, + }) + if err != nil { + panic(err) + } + + return metadataBytes }