diff --git a/CHANGELOG.md b/CHANGELOG.md index aee041d1..9132766c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- `riverlog.Middleware` now supports `MiddlewareConfig.MaxTotalBytes` (default 8 MB) to cap total persisted `river:log` history per job. When the cap is exceeded, oldest log entries are dropped first while retaining the newest entry. Values over 64 MB are clamped to 64 MB. [PR #1157](https://github.com/riverqueue/river/pull/1157). +- Improved `riverlog` performance and reduced memory amplification when appending to large persisted `river:log` histories. [PR #1157](https://github.com/riverqueue/river/pull/1157). + ## [0.31.0] - 2026-02-21 ### Added diff --git a/riverlog/river_log.go b/riverlog/river_log.go index 7528118e..9d2811b3 100644 --- a/riverlog/river_log.go +++ b/riverlog/river_log.go @@ -8,18 +8,26 @@ import ( "context" "encoding/json" "errors" + "fmt" "io" "log/slog" + "github.com/tidwall/gjson" + "github.com/riverqueue/river/internal/jobexecutor" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivertype" ) const ( - maxSizeMB = 2 - maxSizeBytes = maxSizeMB * 1024 * 1024 - metadataKey = "river:log" + maxSizeMB = 2 + maxSizeBytes = maxSizeMB * 1024 * 1024 + maxTotalSizeMB = 8 + maxTotalBytes = maxTotalSizeMB * 1024 * 1024 + // Hard ceiling to prevent pathological allocations from extreme configs. + maxTotalSizeMaxMB = 64 + maxTotalBytesMax = maxTotalSizeMaxMB * 1024 * 1024 + metadataKey = "river:log" ) type contextKey struct{} @@ -77,6 +85,16 @@ type MiddlewareConfig struct { // // Defaults to 2 MB (which is per job attempt). MaxSizeBytes int + + // MaxTotalBytes is the maximum total size of all persisted river logs for a + // job attempt history. If appending the latest attempt would exceed this + // size, oldest log entries are dropped first. + // + // The latest entry is always retained, even if doing so means the resulting + // payload exceeds MaxTotalBytes. + // + // Defaults to 8 MB. Values larger than 64 MB are clamped to 64 MB. + MaxTotalBytes int } // NewMiddleware initializes a new Middleware with the given slog handler @@ -136,6 +154,7 @@ func defaultConfig(config *MiddlewareConfig) *MiddlewareConfig { } config.MaxSizeBytes = cmp.Or(config.MaxSizeBytes, maxSizeBytes) + config.MaxTotalBytes = min(cmp.Or(config.MaxTotalBytes, maxTotalBytes), maxTotalBytesMax) return config } @@ -150,10 +169,7 @@ type metadataWithLog struct { } func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error { - var ( - existingLogData metadataWithLog - logBuf bytes.Buffer - ) + var logBuf bytes.Buffer switch { case m.newCustomContext != nil: @@ -165,10 +181,6 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu return errors.New("expected either newContextLogger or newSlogHandler to be set") } - if err := json.Unmarshal(job.Metadata, &existingLogData); err != nil { - return err - } - metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx) if !hasMetadataUpdates { return errors.New("expected to find metadata updates in context, but didn't") @@ -176,31 +188,47 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu // This all runs invariant of whether the job panics or returns an error. defer func() { - logData := logBuf.String() + logBytes := logBuf.Bytes() // Return early if nothing ended up getting logged. - if len(logData) < 1 { + if len(logBytes) < 1 { return } // Postgres JSONB is limited to 255MB, but it would be a bad idea to get // anywhere close to that limit here. - if len(logData) > m.config.MaxSizeBytes { + if len(logBytes) > m.config.MaxSizeBytes { m.Logger.WarnContext(ctx, m.Name+": Logs size exceeded maximum; truncating", - slog.Int("logs_size", len(logData)), + slog.Int("logs_size", len(logBytes)), slog.Int("max_size", m.config.MaxSizeBytes), ) - logData = logData[0:m.config.MaxSizeBytes] + logBytes = logBytes[0:m.config.MaxSizeBytes] } - allLogDataBytes, err := json.Marshal(append(existingLogData.RiverLog, logAttempt{ + newLogEntryBytes, err := json.Marshal(logAttempt{ Attempt: job.Attempt, - Log: logData, - })) + Log: string(logBytes), + }) if err != nil { m.Logger.ErrorContext(ctx, m.Name+": Error marshaling log data", slog.Any("error", err), ) + return + } + + allLogDataBytes, numDroppedEntries, err := appendLogDataWithCap(job.Metadata, newLogEntryBytes, m.config.MaxTotalBytes) + if err != nil { + m.Logger.ErrorContext(ctx, m.Name+": Error marshaling log data", + slog.Any("error", err), + ) + return + } + + if numDroppedEntries > 0 { + m.Logger.WarnContext(ctx, m.Name+": Logs size exceeded total maximum; dropping oldest entries", + slog.Int("max_total_size", m.config.MaxTotalBytes), + slog.Int("num_entries_dropped", numDroppedEntries), + ) } metadataUpdates[metadataKey] = json.RawMessage(allLogDataBytes) @@ -208,3 +236,125 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu return doInner(ctx) } + +func appendLogDataWithCap(metadataBytes, newLogEntryBytes []byte, maxTotalBytes int) ([]byte, int, error) { + existingLogData := gjson.GetBytes(metadataBytes, metadataKey) + var existingLogArrayBytes []byte + switch { + case !existingLogData.Exists(): + existingLogArrayBytes = []byte("[]") + case existingLogData.IsArray(): + // Slice raw JSON straight from metadata bytes to avoid an extra copy. + existingLogArrayBytes = metadataBytes[existingLogData.Index : existingLogData.Index+len(existingLogData.Raw)] + default: + return nil, 0, fmt.Errorf("%q value is not an array", metadataKey) + } + + existingElementBounds, err := getArrayElementBounds(existingLogArrayBytes) + if err != nil { + return nil, 0, err + } + + // Determine the smallest suffix to keep that still fits with the new entry. + // This keeps pruning oldest-first while avoiding repeated full rewrites. + keepStart := getKeepStart(existingElementBounds, len(newLogEntryBytes), maxTotalBytes) + + // Build the final array once from the kept suffix plus the new entry. + appendedLogDataBytes := buildAppendedArray(existingLogArrayBytes, existingElementBounds, keepStart, newLogEntryBytes) + numDroppedEntries := min(keepStart, len(existingElementBounds)) + + return appendedLogDataBytes, numDroppedEntries, nil +} + +type arrayElementBounds struct { + Start int + End int +} + +func getArrayElementBounds(arrayBytes []byte) ([]arrayElementBounds, error) { + arrResult := gjson.ParseBytes(arrayBytes) + if !arrResult.IsArray() { + return nil, errors.New("expected a JSON array") + } + + elements := arrResult.Array() + bounds := make([]arrayElementBounds, len(elements)) + for i, elem := range elements { + if elem.Index < 0 { + return nil, errors.New("failed to determine array element index") + } + bounds[i] = arrayElementBounds{ + Start: elem.Index, + End: elem.Index + len(elem.Raw), + } + } + return bounds, nil +} + +func getKeepStart(bounds []arrayElementBounds, newEntryLen, maxTotalBytes int) int { + if maxTotalBytes <= 0 { + return 0 + } + + // Keep newest entry even if it's larger than the configured cap. + newOnlyLen := 2 + newEntryLen // `[` + entry + `]` + if newOnlyLen > maxTotalBytes { + return len(bounds) + } + + if len(bounds) == 0 { + return 0 + } + + lastEnd := bounds[len(bounds)-1].End + + // Iterate from oldest to newest so we drop the minimum number of entries + // necessary to fit the configured cap. + for keepStart := 0; keepStart <= len(bounds); keepStart++ { + contentLen := newEntryLen + if keepStart < len(bounds) { + // Use actual byte offsets from parsed elements so separator + // whitespace is fully counted toward the cap. + keptContentLen := lastEnd - bounds[keepStart].Start + contentLen += 1 + keptContentLen // comma between kept suffix and new entry + } + totalLen := 2 + contentLen // `[` + content + `]` + if totalLen <= maxTotalBytes { + return keepStart + } + } + + return len(bounds) +} + +func buildAppendedArray(existingArrayBytes []byte, bounds []arrayElementBounds, keepStart int, newEntryBytes []byte) []byte { + totalLen := 0 + maxInt := int(^uint(0) >> 1) + + // Preallocation is an optimization only. If length math would overflow, + // fall back to zero-capacity and let append grow as needed. + if keepStart >= len(bounds) { + if len(newEntryBytes) <= maxInt-2 { + totalLen = 2 + len(newEntryBytes) + } + } else { + // Kept suffix is contiguous in the original array bytes, so copy once. + keptContentLen := bounds[len(bounds)-1].End - bounds[keepStart].Start + if keptContentLen >= 0 && len(newEntryBytes) <= maxInt-3 && keptContentLen <= maxInt-3-len(newEntryBytes) { + totalLen = 3 + keptContentLen + len(newEntryBytes) // [] + comma + new entry + } + } + + result := make([]byte, 0, totalLen) + result = append(result, '[') + + if keepStart < len(bounds) { + result = append(result, existingArrayBytes[bounds[keepStart].Start:bounds[len(bounds)-1].End]...) + result = append(result, ',') + } + + result = append(result, newEntryBytes...) + result = append(result, ']') + + return result +} diff --git a/riverlog/river_log_benchmark_test.go b/riverlog/river_log_benchmark_test.go new file mode 100644 index 00000000..1193c790 --- /dev/null +++ b/riverlog/river_log_benchmark_test.go @@ -0,0 +1,86 @@ +package riverlog + +import ( + "context" + "encoding/json" + "io" + "log/slog" + "strings" + "testing" + + "github.com/riverqueue/river/internal/jobexecutor" + "github.com/riverqueue/river/rivertype" +) + +func BenchmarkMiddlewareWorkAppend(b *testing.B) { + if testing.Short() { + b.Skip("skipping benchmark in short mode") + } + + middleware := NewMiddleware(func(w io.Writer) slog.Handler { + return slog.NewTextHandler(w, nil) + }, nil) + + cases := []struct { + name string + size int + }{ + {name: "metadata_256kb", size: 256 * 1024}, + {name: "metadata_2mb", size: 2 * 1024 * 1024}, + } + + for _, tc := range cases { + b.Run(tc.name, func(b *testing.B) { + metadata := makeMetadataWithLogSize(tc.size) + appendLogLine := strings.Repeat("x", 2048) + + b.ReportAllocs() + b.ResetTimer() + + for range b.N { + metadataUpdates := map[string]any{} + ctx := context.WithValue(context.Background(), jobexecutor.ContextKeyMetadataUpdates, metadataUpdates) + job := &rivertype.JobRow{ + Attempt: 1, + Metadata: metadata, + } + + if err := middleware.Work(ctx, job, func(ctx context.Context) error { + Logger(ctx).InfoContext(ctx, appendLogLine) + return nil + }); err != nil { + b.Fatalf("work returned error: %v", err) + } + + if _, ok := metadataUpdates[metadataKey]; !ok { + b.Fatal("missing river:log metadata update") + } + } + }) + } +} + +func makeMetadataWithLogSize(targetBytes int) []byte { + if targetBytes <= 0 { + return []byte(`{}`) + } + + const perEntryLogSize = 1024 + payload := strings.Repeat("x", perEntryLogSize) + numEntries := max(1, targetBytes/perEntryLogSize) + + logs := make([]logAttempt, numEntries) + for i := range numEntries { + logs[i] = logAttempt{ + Attempt: i + 1, + Log: payload, + } + } + + metadataBytes, err := json.Marshal(metadataWithLog{RiverLog: logs}) + if err != nil { + panic(err) + } + + return metadataBytes +} diff --git a/riverlog/river_log_test.go b/riverlog/river_log_test.go index 38bb9610..4b5cfb12 100644 --- a/riverlog/river_log_test.go +++ b/riverlog/river_log_test.go @@ -7,6 +7,7 @@ import ( "io" "log" "log/slog" + "strings" "testing" "github.com/jackc/pgx/v5" @@ -44,6 +45,159 @@ func TestLogger(t *testing.T) { }) } +func TestAppendLogDataWithCap(t *testing.T) { + t.Parallel() + + marshalLog := func(tb testing.TB, attempt int, log string) []byte { + tb.Helper() + b, err := json.Marshal(logAttempt{Attempt: attempt, Log: log}) + require.NoError(tb, err) + return b + } + + marshalMetadataWithLogs := func(tb testing.TB, logs []logAttempt) []byte { + tb.Helper() + b, err := json.Marshal(map[string]any{ + metadataKey: logs, + }) + require.NoError(tb, err) + return b + } + + unmarshalLogs := func(tb testing.TB, rawArray []byte) []logAttempt { + tb.Helper() + var logs []logAttempt + require.NoError(tb, json.Unmarshal(rawArray, &logs)) + return logs + } + + t.Run("MissingKeyStartsFromEmptyArray", func(t *testing.T) { + t.Parallel() + + newEntry := marshalLog(t, 1, "new") + result, dropped, err := appendLogDataWithCap([]byte(`{"other":"value"}`), newEntry, maxTotalBytes) + require.NoError(t, err) + require.Zero(t, dropped) + require.Equal(t, []logAttempt{{Attempt: 1, Log: "new"}}, unmarshalLogs(t, result)) + }) + + t.Run("NonArrayLogValueReturnsError", func(t *testing.T) { + t.Parallel() + + newEntry := marshalLog(t, 1, "new") + _, _, err := appendLogDataWithCap([]byte(`{"river:log":{"not":"array"}}`), newEntry, maxTotalBytes) + require.EqualError(t, err, `"river:log" value is not an array`) + }) + + t.Run("PrunesOldestEntriesOnlyAsNeeded", func(t *testing.T) { + t.Parallel() + + existing := []logAttempt{ + {Attempt: 1, Log: "a"}, + {Attempt: 2, Log: "b"}, + {Attempt: 3, Log: "c"}, + } + newEntry := marshalLog(t, 4, "d") + + target, err := json.Marshal([]logAttempt{ + {Attempt: 2, Log: "b"}, + {Attempt: 3, Log: "c"}, + {Attempt: 4, Log: "d"}, + }) + require.NoError(t, err) + + result, dropped, err := appendLogDataWithCap(marshalMetadataWithLogs(t, existing), newEntry, len(target)) + require.NoError(t, err) + require.Equal(t, 1, dropped) + require.Equal(t, []logAttempt{ + {Attempt: 2, Log: "b"}, + {Attempt: 3, Log: "c"}, + {Attempt: 4, Log: "d"}, + }, unmarshalLogs(t, result)) + }) + + t.Run("KeepsNewestEntryEvenIfOverCap", func(t *testing.T) { + t.Parallel() + + existing := []logAttempt{ + {Attempt: 1, Log: "a"}, + {Attempt: 2, Log: "b"}, + } + newEntry := marshalLog(t, 3, strings.Repeat("x", 64)) + + result, dropped, err := appendLogDataWithCap(marshalMetadataWithLogs(t, existing), newEntry, 8) + require.NoError(t, err) + require.Equal(t, len(existing), dropped) + require.Equal(t, []logAttempt{ + {Attempt: 3, Log: strings.Repeat("x", 64)}, + }, unmarshalLogs(t, result)) + }) + + t.Run("NoCapKeepsEverything", func(t *testing.T) { + t.Parallel() + + existing := []logAttempt{ + {Attempt: 1, Log: "a"}, + } + newEntry := marshalLog(t, 2, "b") + + result, dropped, err := appendLogDataWithCap(marshalMetadataWithLogs(t, existing), newEntry, 0) + require.NoError(t, err) + require.Zero(t, dropped) + require.Equal(t, []logAttempt{ + {Attempt: 1, Log: "a"}, + {Attempt: 2, Log: "b"}, + }, unmarshalLogs(t, result)) + }) + + t.Run("LargeExistingPayloadPrunesToCap", func(t *testing.T) { + t.Parallel() + + // Simulate externally-written oversized payloads in metadata. + existing := []logAttempt{ + {Attempt: 1, Log: strings.Repeat("x", 4*1024*1024)}, + } + newEntry := marshalLog(t, 2, "new") + + // Cap is small enough that only the new entry can remain. + const maxTotalBytes = 256 + + result, dropped, err := appendLogDataWithCap(marshalMetadataWithLogs(t, existing), newEntry, maxTotalBytes) + require.NoError(t, err) + require.Equal(t, 1, dropped) + require.LessOrEqual(t, len(result), maxTotalBytes) + require.Equal(t, []logAttempt{ + {Attempt: 2, Log: "new"}, + }, unmarshalLogs(t, result)) + }) + + t.Run("WhitespaceSeparatorsCountTowardsCap", func(t *testing.T) { + t.Parallel() + + // jsonb canonicalization in Postgres usually removes separator + // whitespace, so this shape is unlikely for production rows loaded from + // the database. We still keep this test because the cap logic must remain + // correct for any valid JSON input. If separators include spaces/newlines, + // those bytes still count toward MaxTotalBytes and must be included in the + // retention calculation. + existingMetadata := []byte(`{"river:log":[{"attempt":1,"log":"a"}, {"attempt":2,"log":"b"}, {"attempt":3,"log":"c"}]}`) + newEntry := marshalLog(t, 4, "d") + + // This cap is large enough to fit [2,3,4] only if separator whitespace is + // ignored. Correct behavior must count all bytes in the final JSON. + const maxTotalBytes = 73 + + result, dropped, err := appendLogDataWithCap(existingMetadata, newEntry, maxTotalBytes) + require.NoError(t, err) + require.LessOrEqual(t, len(result), maxTotalBytes) + require.Equal(t, 2, dropped) + require.Equal(t, []logAttempt{ + {Attempt: 3, Log: "c"}, + {Attempt: 4, Log: "d"}, + }, unmarshalLogs(t, result)) + }) +} + func TestLoggerSafely(t *testing.T) { t.Parallel() @@ -270,6 +424,107 @@ func TestMiddleware(t *testing.T) { ) }) + t.Run("PrunesOldestEntriesAtMaxTotalBytes", func(t *testing.T) { + t.Parallel() + + maxTotalBytes, err := json.Marshal([]logAttempt{ + {Attempt: 1, Log: `msg="Logged from worker"` + "\n"}, + {Attempt: 2, Log: `msg="Logged from worker"` + "\n"}, + }) + require.NoError(t, err) + + testWorker, bundle := setup(t, &MiddlewareConfig{ + // Keep two entries, force pruning once a third is appended. + MaxTotalBytes: len(maxTotalBytes) + 16, + }) + + workRes, err := testWorker.Work(ctx, t, bundle.tx, loggingArgs{Message: "Logged from worker"}, nil) + require.NoError(t, err) + + // Set state back to available and unfinalize the job to make it runnable again. + workRes.Job, err = bundle.driver.UnwrapExecutor(bundle.tx).JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{ + ID: workRes.Job.ID, + FinalizedAtDoUpdate: true, + FinalizedAt: nil, + StateDoUpdate: true, + State: rivertype.JobStateAvailable, + }) + require.NoError(t, err) + + workRes, err = testWorker.WorkJob(ctx, t, bundle.tx, workRes.Job) + require.NoError(t, err) + + // Set state back to available and unfinalize the job to make it runnable again. + workRes.Job, err = bundle.driver.UnwrapExecutor(bundle.tx).JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{ + ID: workRes.Job.ID, + FinalizedAtDoUpdate: true, + FinalizedAt: nil, + StateDoUpdate: true, + State: rivertype.JobStateAvailable, + }) + require.NoError(t, err) + + workRes, err = testWorker.WorkJob(ctx, t, bundle.tx, workRes.Job) + require.NoError(t, err) + + var metadataWithLog metadataWithLog + require.NoError(t, json.Unmarshal(workRes.Job.Metadata, &metadataWithLog)) + + require.Equal(t, []logAttempt{ + {Attempt: 2, Log: `msg="Logged from worker"` + "\n"}, + {Attempt: 3, Log: `msg="Logged from worker"` + "\n"}, + }, metadataWithLog.RiverLog) + }) + + t.Run("RetainsLatestEntryWhenTotalLimitTiny", func(t *testing.T) { + t.Parallel() + + testWorker, bundle := setup(t, &MiddlewareConfig{ + MaxTotalBytes: 1, + }) + + workRes, err := testWorker.Work(ctx, t, bundle.tx, loggingArgs{Message: "Logged from worker"}, nil) + require.NoError(t, err) + + // Set state back to available and unfinalize the job to make it runnable again. + workRes.Job, err = bundle.driver.UnwrapExecutor(bundle.tx).JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{ + ID: workRes.Job.ID, + FinalizedAtDoUpdate: true, + FinalizedAt: nil, + StateDoUpdate: true, + State: rivertype.JobStateAvailable, + }) + require.NoError(t, err) + + workRes, err = testWorker.WorkJob(ctx, t, bundle.tx, workRes.Job) + require.NoError(t, err) + + var metadataWithLog metadataWithLog + require.NoError(t, json.Unmarshal(workRes.Job.Metadata, &metadataWithLog)) + + require.Equal(t, []logAttempt{ + {Attempt: 2, Log: `msg="Logged from worker"` + "\n"}, + }, metadataWithLog.RiverLog) + }) + + t.Run("DefaultsMaxTotalBytes", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t, nil) + + require.Equal(t, maxTotalBytes, bundle.middleware.config.MaxTotalBytes) + }) + + t.Run("ClampsMaxTotalBytes", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t, &MiddlewareConfig{ + MaxTotalBytes: maxTotalBytesMax + 1, + }) + + require.Equal(t, maxTotalBytesMax, bundle.middleware.config.MaxTotalBytes) + }) + t.Run("RawMiddleware", func(t *testing.T) { t.Parallel()