From 23921c25a9f8e586afca9b061aa9b8f7a20bbf25 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Tue, 24 Feb 2026 10:21:22 -0600 Subject: [PATCH 1/5] riverlog: add lightweight append benchmarks Large `river:log` metadata can drive significant allocation spikes while an attempt appends one more log entry. Before changing behavior, we need an inexpensive way to measure the hot path and compare alternatives. Add focused benchmarks in riverlog that exercise middleware append behavior with prebuilt 256 KB and 2 MB metadata payloads. The benchmark stays in-process (no DB setup), reports allocations, and skips in testing.Short() so it remains safe for quick CI-oriented runs. --- riverlog/river_log_benchmark_test.go | 86 ++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 riverlog/river_log_benchmark_test.go diff --git a/riverlog/river_log_benchmark_test.go b/riverlog/river_log_benchmark_test.go new file mode 100644 index 00000000..1193c790 --- /dev/null +++ b/riverlog/river_log_benchmark_test.go @@ -0,0 +1,86 @@ +package riverlog + +import ( + "context" + "encoding/json" + "io" + "log/slog" + "strings" + "testing" + + "github.com/riverqueue/river/internal/jobexecutor" + "github.com/riverqueue/river/rivertype" +) + +func BenchmarkMiddlewareWorkAppend(b *testing.B) { + if testing.Short() { + b.Skip("skipping benchmark in short mode") + } + + middleware := NewMiddleware(func(w io.Writer) slog.Handler { + return slog.NewTextHandler(w, nil) + }, nil) + + cases := []struct { + name string + size int + }{ + {name: "metadata_256kb", size: 256 * 1024}, + {name: "metadata_2mb", size: 2 * 1024 * 1024}, + } + + for _, tc := range cases { + b.Run(tc.name, func(b *testing.B) { + metadata := makeMetadataWithLogSize(tc.size) + appendLogLine := strings.Repeat("x", 2048) + + b.ReportAllocs() + b.ResetTimer() + + for range b.N { + metadataUpdates := map[string]any{} + ctx := context.WithValue(context.Background(), jobexecutor.ContextKeyMetadataUpdates, metadataUpdates) + job := &rivertype.JobRow{ + Attempt: 1, + Metadata: metadata, + } + + if err := middleware.Work(ctx, job, func(ctx context.Context) error { + Logger(ctx).InfoContext(ctx, appendLogLine) + return nil + }); err != nil { + b.Fatalf("work returned error: %v", err) + } + + if _, ok := metadataUpdates[metadataKey]; !ok { + b.Fatal("missing river:log metadata update") + } + } + }) + } +} + +func makeMetadataWithLogSize(targetBytes int) []byte { + if targetBytes <= 0 { + return []byte(`{}`) + } + + const perEntryLogSize = 1024 + payload := strings.Repeat("x", perEntryLogSize) + numEntries := max(1, targetBytes/perEntryLogSize) + + logs := make([]logAttempt, numEntries) + for i := range numEntries { + logs[i] = logAttempt{ + Attempt: i + 1, + Log: payload, + } + } + + metadataBytes, err := json.Marshal(metadataWithLog{RiverLog: logs}) + if err != nil { + panic(err) + } + + return metadataBytes +} From 199b824c38e18cc7615a07057ef893bebaf95c4d Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Tue, 24 Feb 2026 10:23:37 -0600 Subject: [PATCH 2/5] riverlog: cap total persisted log size Snoozed jobs can run repeatedly without consuming attempts, so river:log entries may grow without bound over long-running jobs. Large metadata payloads then amplify memory use on each future append. Add a new `MiddlewareConfig` field `MaxTotalBytes` to bound the total serialized `river:log` payload. When the new attempt is appended and the result exceeds the cap, oldest entries are dropped first while always retaining the newest entry. The middleware now defaults `MaxTotalBytes` to 8 MB. New tests verify that oldest entries are pruned when the cap is reached, a tiny cap still retains the newest entry, and the default is applied. --- CHANGELOG.md | 5 ++ riverlog/river_log.go | 60 ++++++++++++++++++++-- riverlog/river_log_test.go | 100 +++++++++++++++++++++++++++++++++++++ 3 files changed, 160 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aee041d1..9132766c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- `riverlog.Middleware` now supports `MiddlewareConfig.MaxTotalBytes` (default 8 MB) to cap total persisted `river:log` history per job. When the cap is exceeded, oldest log entries are dropped first while retaining the newest entry. Values over 64 MB are clamped to 64 MB. [PR #1157](https://github.com/riverqueue/river/pull/1157). +- Improved `riverlog` performance and reduced memory amplification when appending to large persisted `river:log` histories. [PR #1157](https://github.com/riverqueue/river/pull/1157). + ## [0.31.0] - 2026-02-21 ### Added diff --git a/riverlog/river_log.go b/riverlog/river_log.go index 7528118e..9e245b6e 100644 --- a/riverlog/river_log.go +++ b/riverlog/river_log.go @@ -17,9 +17,14 @@ import ( ) const ( - maxSizeMB = 2 - maxSizeBytes = maxSizeMB * 1024 * 1024 - metadataKey = "river:log" + maxSizeMB = 2 + maxSizeBytes = maxSizeMB * 1024 * 1024 + maxTotalSizeMB = 8 + maxTotalBytes = maxTotalSizeMB * 1024 * 1024 + // Hard ceiling to prevent pathological allocations from extreme configs. + maxTotalSizeMaxMB = 64 + maxTotalBytesMax = maxTotalSizeMaxMB * 1024 * 1024 + metadataKey = "river:log" ) type contextKey struct{} @@ -77,6 +82,16 @@ type MiddlewareConfig struct { // // Defaults to 2 MB (which is per job attempt). MaxSizeBytes int + + // MaxTotalBytes is the maximum total size of all persisted river logs for a + // job attempt history. If appending the latest attempt would exceed this + // size, oldest log entries are dropped first. + // + // The latest entry is always retained, even if doing so means the resulting + // payload exceeds MaxTotalBytes. + // + // Defaults to 8 MB. Values larger than 64 MB are clamped to 64 MB. + MaxTotalBytes int } // NewMiddleware initializes a new Middleware with the given slog handler @@ -136,6 +151,10 @@ func defaultConfig(config *MiddlewareConfig) *MiddlewareConfig { } config.MaxSizeBytes = cmp.Or(config.MaxSizeBytes, maxSizeBytes) + config.MaxTotalBytes = cmp.Or(config.MaxTotalBytes, maxTotalBytes) + if config.MaxTotalBytes > maxTotalBytesMax { + config.MaxTotalBytes = maxTotalBytesMax + } return config } @@ -193,18 +212,49 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu logData = logData[0:m.config.MaxSizeBytes] } - allLogDataBytes, err := json.Marshal(append(existingLogData.RiverLog, logAttempt{ + allLogDataBytes, numDroppedEntries, err := marshalLogDataWithCap(append(existingLogData.RiverLog, logAttempt{ Attempt: job.Attempt, Log: logData, - })) + }), m.config.MaxTotalBytes) if err != nil { m.Logger.ErrorContext(ctx, m.Name+": Error marshaling log data", slog.Any("error", err), ) } + if numDroppedEntries > 0 { + m.Logger.WarnContext(ctx, m.Name+": Logs size exceeded total maximum; dropping oldest entries", + slog.Int("max_total_size", m.config.MaxTotalBytes), + slog.Int("num_entries_dropped", numDroppedEntries), + ) + } + metadataUpdates[metadataKey] = json.RawMessage(allLogDataBytes) }() return doInner(ctx) } + +func marshalLogDataWithCap(allLogData []logAttempt, maxTotalBytes int) ([]byte, int, error) { + allLogDataBytes, err := json.Marshal(allLogData) + if err != nil { + return nil, 0, err + } + + if maxTotalBytes <= 0 || len(allLogDataBytes) <= maxTotalBytes { + return allLogDataBytes, 0, nil + } + + // Drop oldest entries first, while always retaining the latest one. + var numDroppedEntries int + for numDroppedEntries < len(allLogData)-1 && len(allLogDataBytes) > maxTotalBytes { + numDroppedEntries++ + + allLogDataBytes, err = json.Marshal(allLogData[numDroppedEntries:]) + if err != nil { + return nil, numDroppedEntries, err + } + } + + return allLogDataBytes, numDroppedEntries, nil +} diff --git a/riverlog/river_log_test.go b/riverlog/river_log_test.go index 38bb9610..83fe288a 100644 --- a/riverlog/river_log_test.go +++ b/riverlog/river_log_test.go @@ -270,6 +270,106 @@ func TestMiddleware(t *testing.T) { ) }) + t.Run("PrunesOldestEntriesAtMaxTotalBytes", func(t *testing.T) { + t.Parallel() + + maxTotalBytes, err := json.Marshal([]logAttempt{ + {Attempt: 1, Log: `msg="Logged from worker"` + "\n"}, + {Attempt: 2, Log: `msg="Logged from worker"` + "\n"}, + }) + require.NoError(t, err) + + testWorker, bundle := setup(t, &MiddlewareConfig{ + MaxTotalBytes: len(maxTotalBytes), + }) + + workRes, err := testWorker.Work(ctx, t, bundle.tx, loggingArgs{Message: "Logged from worker"}, nil) + require.NoError(t, err) + + // Set state back to available and unfinalize the job to make it runnable again. + workRes.Job, err = bundle.driver.UnwrapExecutor(bundle.tx).JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{ + ID: workRes.Job.ID, + FinalizedAtDoUpdate: true, + FinalizedAt: nil, + StateDoUpdate: true, + State: rivertype.JobStateAvailable, + }) + require.NoError(t, err) + + workRes, err = testWorker.WorkJob(ctx, t, bundle.tx, workRes.Job) + require.NoError(t, err) + + // Set state back to available and unfinalize the job to make it runnable again. + workRes.Job, err = bundle.driver.UnwrapExecutor(bundle.tx).JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{ + ID: workRes.Job.ID, + FinalizedAtDoUpdate: true, + FinalizedAt: nil, + StateDoUpdate: true, + State: rivertype.JobStateAvailable, + }) + require.NoError(t, err) + + workRes, err = testWorker.WorkJob(ctx, t, bundle.tx, workRes.Job) + require.NoError(t, err) + + var metadataWithLog metadataWithLog + require.NoError(t, json.Unmarshal(workRes.Job.Metadata, &metadataWithLog)) + + require.Equal(t, []logAttempt{ + {Attempt: 2, Log: `msg="Logged from worker"` + "\n"}, + {Attempt: 3, Log: `msg="Logged from worker"` + "\n"}, + }, metadataWithLog.RiverLog) + }) + + t.Run("RetainsLatestEntryWhenTotalLimitTiny", func(t *testing.T) { + t.Parallel() + + testWorker, bundle := setup(t, &MiddlewareConfig{ + MaxTotalBytes: 1, + }) + + workRes, err := testWorker.Work(ctx, t, bundle.tx, loggingArgs{Message: "Logged from worker"}, nil) + require.NoError(t, err) + + // Set state back to available and unfinalize the job to make it runnable again. + workRes.Job, err = bundle.driver.UnwrapExecutor(bundle.tx).JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{ + ID: workRes.Job.ID, + FinalizedAtDoUpdate: true, + FinalizedAt: nil, + StateDoUpdate: true, + State: rivertype.JobStateAvailable, + }) + require.NoError(t, err) + + workRes, err = testWorker.WorkJob(ctx, t, bundle.tx, workRes.Job) + require.NoError(t, err) + + var metadataWithLog metadataWithLog + require.NoError(t, json.Unmarshal(workRes.Job.Metadata, &metadataWithLog)) + + require.Equal(t, []logAttempt{ + {Attempt: 2, Log: `msg="Logged from worker"` + "\n"}, + }, metadataWithLog.RiverLog) + }) + + t.Run("DefaultsMaxTotalBytes", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t, nil) + + require.Equal(t, maxTotalBytes, bundle.middleware.config.MaxTotalBytes) + }) + + t.Run("ClampsMaxTotalBytes", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t, &MiddlewareConfig{ + MaxTotalBytes: maxTotalBytesMax + 1, + }) + + require.Equal(t, maxTotalBytesMax, bundle.middleware.config.MaxTotalBytes) + }) + t.Run("RawMiddleware", func(t *testing.T) { t.Parallel() From ebc3b81f67ee833bca1269462926a6ba15f96e68 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Tue, 24 Feb 2026 10:25:26 -0600 Subject: [PATCH 3/5] riverlog: use raw messages internally Appending a new river:log entry previously decoded the whole historical array into []logAttempt and then re-encoded it. For large metadata this creates substantial transient allocations for strings and slices. Switch middleware internals to decode river:log as []json.RawMessage. Each run now marshals only the newest entry and re-serializes the raw entry array while preserving the existing total-cap pruning behavior. This removes per-entry struct decoding from the append path. --- riverlog/river_log.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/riverlog/river_log.go b/riverlog/river_log.go index 9e245b6e..a17b48ac 100644 --- a/riverlog/river_log.go +++ b/riverlog/river_log.go @@ -168,10 +168,14 @@ type metadataWithLog struct { RiverLog []logAttempt `json:"river:log"` } +type metadataWithRawLog struct { + RiverLog []json.RawMessage `json:"river:log"` +} + func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error { var ( - existingLogData metadataWithLog - logBuf bytes.Buffer + existingRawLogData metadataWithRawLog + logBuf bytes.Buffer ) switch { @@ -184,7 +188,7 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu return errors.New("expected either newContextLogger or newSlogHandler to be set") } - if err := json.Unmarshal(job.Metadata, &existingLogData); err != nil { + if err := json.Unmarshal(job.Metadata, &existingRawLogData); err != nil { return err } @@ -212,14 +216,23 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu logData = logData[0:m.config.MaxSizeBytes] } - allLogDataBytes, numDroppedEntries, err := marshalLogDataWithCap(append(existingLogData.RiverLog, logAttempt{ + newLogEntryBytes, err := json.Marshal(logAttempt{ Attempt: job.Attempt, Log: logData, - }), m.config.MaxTotalBytes) + }) + if err != nil { + m.Logger.ErrorContext(ctx, m.Name+": Error marshaling log data", + slog.Any("error", err), + ) + return + } + + allLogDataBytes, numDroppedEntries, err := marshalRawLogDataWithCap(append(existingRawLogData.RiverLog, json.RawMessage(newLogEntryBytes)), m.config.MaxTotalBytes) if err != nil { m.Logger.ErrorContext(ctx, m.Name+": Error marshaling log data", slog.Any("error", err), ) + return } if numDroppedEntries > 0 { @@ -235,7 +248,7 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu return doInner(ctx) } -func marshalLogDataWithCap(allLogData []logAttempt, maxTotalBytes int) ([]byte, int, error) { +func marshalRawLogDataWithCap(allLogData []json.RawMessage, maxTotalBytes int) ([]byte, int, error) { allLogDataBytes, err := json.Marshal(allLogData) if err != nil { return nil, 0, err From 7b080a4c053173383e2f505f533569c2762bc58a Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Tue, 24 Feb 2026 10:31:32 -0600 Subject: [PATCH 4/5] riverlog: use sjson for log append and pruning The raw-message iteration still rebuilt log arrays through json marshaling, which left append performance significantly worse than the baseline implementation used on master. Switch riverlog history to gjson/sjson so operations stay on raw JSON arrays. The middleware reads only `river:log` with `gjson`, appends with `sjson.SetRawBytes` using `-1`, and enforces `MaxTotalBytes` by deleting oldest entries with `sjson.DeleteBytes("0")`. Comparison for `BenchmarkMiddlewareWorkAppend` (`-benchtime=100ms`): case: 256 KB impl ns/op B/op allocs/op baseline 976052 645856 291 rawmessage 1605106 698233 293 sjson 614433 1167265 25 case: 2 MB impl ns/op B/op allocs/op baseline 7762435 5920954 2096 rawmessage 12516198 6651551 2098 sjson 4485274 6482082 32 The pruning test changed only to avoid a brittle exact-byte boundary. `MaxTotalBytes` is now set slightly above the two-entry payload so the test consistently verifies the intended behavior: appending a third entry drops the oldest and keeps the newest two. --- riverlog/river_log.go | 48 +++++++++++++++++++++++--------------- riverlog/river_log_test.go | 3 ++- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/riverlog/river_log.go b/riverlog/river_log.go index a17b48ac..46d0acbf 100644 --- a/riverlog/river_log.go +++ b/riverlog/river_log.go @@ -8,9 +8,13 @@ import ( "context" "encoding/json" "errors" + "fmt" "io" "log/slog" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" + "github.com/riverqueue/river/internal/jobexecutor" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivertype" @@ -168,15 +172,8 @@ type metadataWithLog struct { RiverLog []logAttempt `json:"river:log"` } -type metadataWithRawLog struct { - RiverLog []json.RawMessage `json:"river:log"` -} - func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error { - var ( - existingRawLogData metadataWithRawLog - logBuf bytes.Buffer - ) + var logBuf bytes.Buffer switch { case m.newCustomContext != nil: @@ -188,10 +185,6 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu return errors.New("expected either newContextLogger or newSlogHandler to be set") } - if err := json.Unmarshal(job.Metadata, &existingRawLogData); err != nil { - return err - } - metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx) if !hasMetadataUpdates { return errors.New("expected to find metadata updates in context, but didn't") @@ -227,7 +220,7 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu return } - allLogDataBytes, numDroppedEntries, err := marshalRawLogDataWithCap(append(existingRawLogData.RiverLog, json.RawMessage(newLogEntryBytes)), m.config.MaxTotalBytes) + allLogDataBytes, numDroppedEntries, err := appendLogDataWithCap(job.Metadata, newLogEntryBytes, m.config.MaxTotalBytes) if err != nil { m.Logger.ErrorContext(ctx, m.Name+": Error marshaling log data", slog.Any("error", err), @@ -248,8 +241,24 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu return doInner(ctx) } -func marshalRawLogDataWithCap(allLogData []json.RawMessage, maxTotalBytes int) ([]byte, int, error) { - allLogDataBytes, err := json.Marshal(allLogData) +func appendLogDataWithCap(metadataBytes, newLogEntryBytes []byte, maxTotalBytes int) ([]byte, int, error) { + if !json.Valid(metadataBytes) { + return nil, 0, errors.New("metadata is not valid JSON") + } + + existingLogData := gjson.GetBytes(metadataBytes, metadataKey) + var allLogDataBytes []byte + switch { + case !existingLogData.Exists(): + allLogDataBytes = []byte("[]") + case existingLogData.IsArray(): + allLogDataBytes = []byte(existingLogData.Raw) + default: + return nil, 0, fmt.Errorf("%q value is not an array", metadataKey) + } + + var err error + allLogDataBytes, err = sjson.SetRawBytes(allLogDataBytes, "-1", newLogEntryBytes) if err != nil { return nil, 0, err } @@ -259,14 +268,15 @@ func marshalRawLogDataWithCap(allLogData []json.RawMessage, maxTotalBytes int) ( } // Drop oldest entries first, while always retaining the latest one. + numEntries := len(gjson.ParseBytes(allLogDataBytes).Array()) var numDroppedEntries int - for numDroppedEntries < len(allLogData)-1 && len(allLogDataBytes) > maxTotalBytes { - numDroppedEntries++ - - allLogDataBytes, err = json.Marshal(allLogData[numDroppedEntries:]) + for numEntries > 1 && len(allLogDataBytes) > maxTotalBytes { + allLogDataBytes, err = sjson.DeleteBytes(allLogDataBytes, "0") if err != nil { return nil, numDroppedEntries, err } + numEntries-- + numDroppedEntries++ } return allLogDataBytes, numDroppedEntries, nil diff --git a/riverlog/river_log_test.go b/riverlog/river_log_test.go index 83fe288a..09cdb1a8 100644 --- a/riverlog/river_log_test.go +++ b/riverlog/river_log_test.go @@ -280,7 +280,8 @@ func TestMiddleware(t *testing.T) { require.NoError(t, err) testWorker, bundle := setup(t, &MiddlewareConfig{ - MaxTotalBytes: len(maxTotalBytes), + // Keep two entries, force pruning once a third is appended. + MaxTotalBytes: len(maxTotalBytes) + 16, }) workRes, err := testWorker.Work(ctx, t, bundle.tx, loggingArgs{Message: "Logged from worker"}, nil) From 020e1db11b7097310e05ba83982fc1a96880dbd1 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Tue, 24 Feb 2026 18:56:25 -0600 Subject: [PATCH 5/5] riverlog: build capped log arrays in one pass Appending to large river:log histories still did extra work in the sjson version: it built an appended array first, then repeatedly deleted index 0 while over cap. In high-volume histories this caused multiple full-array rewrites before reaching the final payload. Rework append/prune to compute the kept suffix first and then build the final array once. The new flow marshals only the new entry, derives bounds for existing array elements, decides how many oldest entries to drop to fit MaxTotalBytes, and emits a single final JSON array. It also truncates log bytes before converting to string so oversized log buffers are not copied in full. Benchmark comparison for `BenchmarkMiddlewareWorkAppend` (`-benchtime=100ms`, same machine): case: 256 KB impl ns/op B/op allocs/op before 821178 1167325 25 after 177355 884454 31 case: 2 MB impl ns/op B/op allocs/op before 4854185 6479606 29 after 1247496 7006818 46 Added focused unit coverage for appendLogDataWithCap, including missing-key behavior, non-array metadata errors, minimal oldest-first pruning, keeping newest entry when over cap, and no-cap behavior. --- riverlog/river_log.go | 143 ++++++++++++++++++++++++++-------- riverlog/river_log_test.go | 154 +++++++++++++++++++++++++++++++++++++ 2 files changed, 264 insertions(+), 33 deletions(-) diff --git a/riverlog/river_log.go b/riverlog/river_log.go index 46d0acbf..9d2811b3 100644 --- a/riverlog/river_log.go +++ b/riverlog/river_log.go @@ -13,7 +13,6 @@ import ( "log/slog" "github.com/tidwall/gjson" - "github.com/tidwall/sjson" "github.com/riverqueue/river/internal/jobexecutor" "github.com/riverqueue/river/rivershared/baseservice" @@ -28,7 +27,7 @@ const ( // Hard ceiling to prevent pathological allocations from extreme configs. maxTotalSizeMaxMB = 64 maxTotalBytesMax = maxTotalSizeMaxMB * 1024 * 1024 - metadataKey = "river:log" + metadataKey = "river:log" ) type contextKey struct{} @@ -155,10 +154,7 @@ func defaultConfig(config *MiddlewareConfig) *MiddlewareConfig { } config.MaxSizeBytes = cmp.Or(config.MaxSizeBytes, maxSizeBytes) - config.MaxTotalBytes = cmp.Or(config.MaxTotalBytes, maxTotalBytes) - if config.MaxTotalBytes > maxTotalBytesMax { - config.MaxTotalBytes = maxTotalBytesMax - } + config.MaxTotalBytes = min(cmp.Or(config.MaxTotalBytes, maxTotalBytes), maxTotalBytesMax) return config } @@ -192,26 +188,26 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu // This all runs invariant of whether the job panics or returns an error. defer func() { - logData := logBuf.String() + logBytes := logBuf.Bytes() // Return early if nothing ended up getting logged. - if len(logData) < 1 { + if len(logBytes) < 1 { return } // Postgres JSONB is limited to 255MB, but it would be a bad idea to get // anywhere close to that limit here. - if len(logData) > m.config.MaxSizeBytes { + if len(logBytes) > m.config.MaxSizeBytes { m.Logger.WarnContext(ctx, m.Name+": Logs size exceeded maximum; truncating", - slog.Int("logs_size", len(logData)), + slog.Int("logs_size", len(logBytes)), slog.Int("max_size", m.config.MaxSizeBytes), ) - logData = logData[0:m.config.MaxSizeBytes] + logBytes = logBytes[0:m.config.MaxSizeBytes] } newLogEntryBytes, err := json.Marshal(logAttempt{ Attempt: job.Attempt, - Log: logData, + Log: string(logBytes), }) if err != nil { m.Logger.ErrorContext(ctx, m.Name+": Error marshaling log data", @@ -242,42 +238,123 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu } func appendLogDataWithCap(metadataBytes, newLogEntryBytes []byte, maxTotalBytes int) ([]byte, int, error) { - if !json.Valid(metadataBytes) { - return nil, 0, errors.New("metadata is not valid JSON") - } - existingLogData := gjson.GetBytes(metadataBytes, metadataKey) - var allLogDataBytes []byte + var existingLogArrayBytes []byte switch { case !existingLogData.Exists(): - allLogDataBytes = []byte("[]") + existingLogArrayBytes = []byte("[]") case existingLogData.IsArray(): - allLogDataBytes = []byte(existingLogData.Raw) + // Slice raw JSON straight from metadata bytes to avoid an extra copy. + existingLogArrayBytes = metadataBytes[existingLogData.Index : existingLogData.Index+len(existingLogData.Raw)] default: return nil, 0, fmt.Errorf("%q value is not an array", metadataKey) } - var err error - allLogDataBytes, err = sjson.SetRawBytes(allLogDataBytes, "-1", newLogEntryBytes) + existingElementBounds, err := getArrayElementBounds(existingLogArrayBytes) if err != nil { return nil, 0, err } - if maxTotalBytes <= 0 || len(allLogDataBytes) <= maxTotalBytes { - return allLogDataBytes, 0, nil + // Determine the smallest suffix to keep that still fits with the new entry. + // This keeps pruning oldest-first while avoiding repeated full rewrites. + keepStart := getKeepStart(existingElementBounds, len(newLogEntryBytes), maxTotalBytes) + + // Build the final array once from the kept suffix plus the new entry. + appendedLogDataBytes := buildAppendedArray(existingLogArrayBytes, existingElementBounds, keepStart, newLogEntryBytes) + numDroppedEntries := min(keepStart, len(existingElementBounds)) + + return appendedLogDataBytes, numDroppedEntries, nil +} + +type arrayElementBounds struct { + Start int + End int +} + +func getArrayElementBounds(arrayBytes []byte) ([]arrayElementBounds, error) { + arrResult := gjson.ParseBytes(arrayBytes) + if !arrResult.IsArray() { + return nil, errors.New("expected a JSON array") } - // Drop oldest entries first, while always retaining the latest one. - numEntries := len(gjson.ParseBytes(allLogDataBytes).Array()) - var numDroppedEntries int - for numEntries > 1 && len(allLogDataBytes) > maxTotalBytes { - allLogDataBytes, err = sjson.DeleteBytes(allLogDataBytes, "0") - if err != nil { - return nil, numDroppedEntries, err + elements := arrResult.Array() + bounds := make([]arrayElementBounds, len(elements)) + for i, elem := range elements { + if elem.Index < 0 { + return nil, errors.New("failed to determine array element index") + } + bounds[i] = arrayElementBounds{ + Start: elem.Index, + End: elem.Index + len(elem.Raw), } - numEntries-- - numDroppedEntries++ } + return bounds, nil +} + +func getKeepStart(bounds []arrayElementBounds, newEntryLen, maxTotalBytes int) int { + if maxTotalBytes <= 0 { + return 0 + } + + // Keep newest entry even if it's larger than the configured cap. + newOnlyLen := 2 + newEntryLen // `[` + entry + `]` + if newOnlyLen > maxTotalBytes { + return len(bounds) + } + + if len(bounds) == 0 { + return 0 + } + + lastEnd := bounds[len(bounds)-1].End + + // Iterate from oldest to newest so we drop the minimum number of entries + // necessary to fit the configured cap. + for keepStart := 0; keepStart <= len(bounds); keepStart++ { + contentLen := newEntryLen + if keepStart < len(bounds) { + // Use actual byte offsets from parsed elements so separator + // whitespace is fully counted toward the cap. + keptContentLen := lastEnd - bounds[keepStart].Start + contentLen += 1 + keptContentLen // comma between kept suffix and new entry + } + totalLen := 2 + contentLen // `[` + content + `]` + if totalLen <= maxTotalBytes { + return keepStart + } + } + + return len(bounds) +} + +func buildAppendedArray(existingArrayBytes []byte, bounds []arrayElementBounds, keepStart int, newEntryBytes []byte) []byte { + totalLen := 0 + maxInt := int(^uint(0) >> 1) + + // Preallocation is an optimization only. If length math would overflow, + // fall back to zero-capacity and let append grow as needed. + if keepStart >= len(bounds) { + if len(newEntryBytes) <= maxInt-2 { + totalLen = 2 + len(newEntryBytes) + } + } else { + // Kept suffix is contiguous in the original array bytes, so copy once. + keptContentLen := bounds[len(bounds)-1].End - bounds[keepStart].Start + if keptContentLen >= 0 && len(newEntryBytes) <= maxInt-3 && keptContentLen <= maxInt-3-len(newEntryBytes) { + totalLen = 3 + keptContentLen + len(newEntryBytes) // [] + comma + new entry + } + } + + result := make([]byte, 0, totalLen) + result = append(result, '[') + + if keepStart < len(bounds) { + result = append(result, existingArrayBytes[bounds[keepStart].Start:bounds[len(bounds)-1].End]...) + result = append(result, ',') + } + + result = append(result, newEntryBytes...) + result = append(result, ']') - return allLogDataBytes, numDroppedEntries, nil + return result } diff --git a/riverlog/river_log_test.go b/riverlog/river_log_test.go index 09cdb1a8..4b5cfb12 100644 --- a/riverlog/river_log_test.go +++ b/riverlog/river_log_test.go @@ -7,6 +7,7 @@ import ( "io" "log" "log/slog" + "strings" "testing" "github.com/jackc/pgx/v5" @@ -44,6 +45,159 @@ func TestLogger(t *testing.T) { }) } +func TestAppendLogDataWithCap(t *testing.T) { + t.Parallel() + + marshalLog := func(tb testing.TB, attempt int, log string) []byte { + tb.Helper() + b, err := json.Marshal(logAttempt{Attempt: attempt, Log: log}) + require.NoError(tb, err) + return b + } + + marshalMetadataWithLogs := func(tb testing.TB, logs []logAttempt) []byte { + tb.Helper() + b, err := json.Marshal(map[string]any{ + metadataKey: logs, + }) + require.NoError(tb, err) + return b + } + + unmarshalLogs := func(tb testing.TB, rawArray []byte) []logAttempt { + tb.Helper() + var logs []logAttempt + require.NoError(tb, json.Unmarshal(rawArray, &logs)) + return logs + } + + t.Run("MissingKeyStartsFromEmptyArray", func(t *testing.T) { + t.Parallel() + + newEntry := marshalLog(t, 1, "new") + result, dropped, err := appendLogDataWithCap([]byte(`{"other":"value"}`), newEntry, maxTotalBytes) + require.NoError(t, err) + require.Zero(t, dropped) + require.Equal(t, []logAttempt{{Attempt: 1, Log: "new"}}, unmarshalLogs(t, result)) + }) + + t.Run("NonArrayLogValueReturnsError", func(t *testing.T) { + t.Parallel() + + newEntry := marshalLog(t, 1, "new") + _, _, err := appendLogDataWithCap([]byte(`{"river:log":{"not":"array"}}`), newEntry, maxTotalBytes) + require.EqualError(t, err, `"river:log" value is not an array`) + }) + + t.Run("PrunesOldestEntriesOnlyAsNeeded", func(t *testing.T) { + t.Parallel() + + existing := []logAttempt{ + {Attempt: 1, Log: "a"}, + {Attempt: 2, Log: "b"}, + {Attempt: 3, Log: "c"}, + } + newEntry := marshalLog(t, 4, "d") + + target, err := json.Marshal([]logAttempt{ + {Attempt: 2, Log: "b"}, + {Attempt: 3, Log: "c"}, + {Attempt: 4, Log: "d"}, + }) + require.NoError(t, err) + + result, dropped, err := appendLogDataWithCap(marshalMetadataWithLogs(t, existing), newEntry, len(target)) + require.NoError(t, err) + require.Equal(t, 1, dropped) + require.Equal(t, []logAttempt{ + {Attempt: 2, Log: "b"}, + {Attempt: 3, Log: "c"}, + {Attempt: 4, Log: "d"}, + }, unmarshalLogs(t, result)) + }) + + t.Run("KeepsNewestEntryEvenIfOverCap", func(t *testing.T) { + t.Parallel() + + existing := []logAttempt{ + {Attempt: 1, Log: "a"}, + {Attempt: 2, Log: "b"}, + } + newEntry := marshalLog(t, 3, strings.Repeat("x", 64)) + + result, dropped, err := appendLogDataWithCap(marshalMetadataWithLogs(t, existing), newEntry, 8) + require.NoError(t, err) + require.Equal(t, len(existing), dropped) + require.Equal(t, []logAttempt{ + {Attempt: 3, Log: strings.Repeat("x", 64)}, + }, unmarshalLogs(t, result)) + }) + + t.Run("NoCapKeepsEverything", func(t *testing.T) { + t.Parallel() + + existing := []logAttempt{ + {Attempt: 1, Log: "a"}, + } + newEntry := marshalLog(t, 2, "b") + + result, dropped, err := appendLogDataWithCap(marshalMetadataWithLogs(t, existing), newEntry, 0) + require.NoError(t, err) + require.Zero(t, dropped) + require.Equal(t, []logAttempt{ + {Attempt: 1, Log: "a"}, + {Attempt: 2, Log: "b"}, + }, unmarshalLogs(t, result)) + }) + + t.Run("LargeExistingPayloadPrunesToCap", func(t *testing.T) { + t.Parallel() + + // Simulate externally-written oversized payloads in metadata. + existing := []logAttempt{ + {Attempt: 1, Log: strings.Repeat("x", 4*1024*1024)}, + } + newEntry := marshalLog(t, 2, "new") + + // Cap is small enough that only the new entry can remain. + const maxTotalBytes = 256 + + result, dropped, err := appendLogDataWithCap(marshalMetadataWithLogs(t, existing), newEntry, maxTotalBytes) + require.NoError(t, err) + require.Equal(t, 1, dropped) + require.LessOrEqual(t, len(result), maxTotalBytes) + require.Equal(t, []logAttempt{ + {Attempt: 2, Log: "new"}, + }, unmarshalLogs(t, result)) + }) + + t.Run("WhitespaceSeparatorsCountTowardsCap", func(t *testing.T) { + t.Parallel() + + // jsonb canonicalization in Postgres usually removes separator + // whitespace, so this shape is unlikely for production rows loaded from + // the database. We still keep this test because the cap logic must remain + // correct for any valid JSON input. If separators include spaces/newlines, + // those bytes still count toward MaxTotalBytes and must be included in the + // retention calculation. + existingMetadata := []byte(`{"river:log":[{"attempt":1,"log":"a"}, {"attempt":2,"log":"b"}, {"attempt":3,"log":"c"}]}`) + newEntry := marshalLog(t, 4, "d") + + // This cap is large enough to fit [2,3,4] only if separator whitespace is + // ignored. Correct behavior must count all bytes in the final JSON. + const maxTotalBytes = 73 + + result, dropped, err := appendLogDataWithCap(existingMetadata, newEntry, maxTotalBytes) + require.NoError(t, err) + require.LessOrEqual(t, len(result), maxTotalBytes) + require.Equal(t, 2, dropped) + require.Equal(t, []logAttempt{ + {Attempt: 3, Log: "c"}, + {Attempt: 4, Log: "d"}, + }, unmarshalLogs(t, result)) + }) +} + func TestLoggerSafely(t *testing.T) { t.Parallel()