diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 11c88d01..10d21372 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -253,7 +253,7 @@ jobs: name: lint runs-on: ubuntu-latest env: - GOLANGCI_LINT_VERSION: v2.4.0 + GOLANGCI_LINT_VERSION: v2.7.2 permissions: contents: read # allow read access to pull request. Use with `only-new-issues` option. diff --git a/.golangci.yaml b/.golangci.yaml index d324d749..743b3f4a 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -27,6 +27,7 @@ linters: - ireturn # bans returning interfaces; questionable as is, but also buggy as hell; very, very annoying - lll # restricts maximum line length; annoying - nlreturn # requires a blank line before returns; annoying + - unqueryvet # bans all use of `SELECT *`; just ... sigh - wsl # a bunch of style/whitespace stuff; annoying - wsl_v5 # a second version of the first annoying wsl; how nice diff --git a/client.go b/client.go index a2c21875..4d54b39a 100644 --- a/client.go +++ b/client.go @@ -1238,7 +1238,8 @@ type SubscribeConfig struct { Kinds []EventKind } -// Special internal variant that lets us inject an overridden size. +// SubscribeConfig is a special internal variant of Subscribe that lets us +// inject an overridden channel size. func (c *Client[TTx]) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, func()) { if c.subscriptionManager == nil { panic("created a subscription on a client that will never work jobs (Queues not configured)") @@ -1446,7 +1447,7 @@ func (c *Client[TTx]) JobDelete(ctx context.Context, id int64) (*rivertype.JobRo }) } -// JobDelete deletes the job with the given ID from the database, returning the +// JobDeleteTx deletes the job with the given ID from the database, returning the // deleted row if it was deleted. Jobs in the running state are not deleted, // instead returning rivertype.ErrJobRunning. This variant lets a caller retry a // job atomically alongside other database changes. A deleted job isn't deleted @@ -2029,10 +2030,10 @@ func (c *Client[TTx]) InsertManyFast(ctx context.Context, params []InsertManyPar return len(res), nil } -// InsertManyTx inserts many jobs at once using Postgres' `COPY FROM` mechanism, -// making the operation quite fast and memory efficient. Each job is inserted as -// an InsertManyParams tuple, which takes job args along with an optional set of -// insert options, which override insert options provided by an +// InsertManyFastTx inserts many jobs at once using Postgres' `COPY FROM` +// mechanism, making the operation quite fast and memory efficient. Each job is +// inserted as an InsertManyParams tuple, which takes job args along with an +// optional set of insert options, which override insert options provided by an // JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. // The provided context is used for the underlying Postgres inserts and can be // used to cancel the operation or apply a timeout. @@ -2443,7 +2444,8 @@ func (c *Client[TTx]) PeriodicJobs() *PeriodicJobBundle { return c.periodicJobs } -// Driver exposes the underlying pilot used by the client. +// Pilot returns the pilot in use by the pilot. If not configured, this is often +// simply StandardPilot. // // API is not stable. DO NOT USE. func (c *Client[TTx]) Pilot() riverpilot.Pilot { @@ -2649,7 +2651,7 @@ func (c *Client[TTx]) QueueResume(ctx context.Context, name string, opts *QueueP return nil } -// QueueResume resumes the queue with the given name. If the queue was +// QueueResumeTx resumes the queue with the given name. If the queue was // previously paused, any clients configured to work that queue will resume // fetching additional jobs. To resume all queues at once, use the special queue // name "*". diff --git a/client_context.go b/client_context.go index 70845c22..4bbded2e 100644 --- a/client_context.go +++ b/client_context.go @@ -33,9 +33,9 @@ func ClientFromContext[TTx any](ctx context.Context) *Client[TTx] { return client } -// ClientFromContext returns the Client from the context. This function can -// only be used within a Worker's Work() method because that is the only place -// River sets the Client on the context. +// ClientFromContextSafely returns the Client from the context. This function +// can only be used within a Worker's Work() method because that is the only +// place River sets the Client on the context. // // It returns an error if the context does not contain a Client, which will // never happen from the context provided to a Worker's Work() method. diff --git a/client_test.go b/client_test.go index d07c1a67..90ccc43e 100644 --- a/client_test.go +++ b/client_test.go @@ -7858,7 +7858,9 @@ func (a JobArgsStaticKind) Kind() string { type JobArgsReflectKind[TKind any] struct{} -func (a JobArgsReflectKind[TKind]) Kind() string { return reflect.TypeOf(a).Name() } +func (a JobArgsReflectKind[TKind]) Kind() string { + return reflect.TypeFor[JobArgsReflectKind[TKind]]().Name() +} func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Parallel() diff --git a/internal/dbunique/unique_fields.go b/internal/dbunique/unique_fields.go index 773f4f5c..a8696108 100644 --- a/internal/dbunique/unique_fields.go +++ b/internal/dbunique/unique_fields.go @@ -91,8 +91,8 @@ func getSortedUniqueFields(typ reflect.Type, path []string, typesSeen map[reflec // Check for `river:"unique"` tag, possibly among other comma-separated values var hasUniqueTag bool if riverTag, ok := field.Tag.Lookup("river"); ok { - tags := strings.Split(riverTag, ",") - for _, tag := range tags { + tags := strings.SplitSeq(riverTag, ",") + for tag := range tags { if strings.TrimSpace(tag) == "unique" { hasUniqueTag = true } diff --git a/internal/jobexecutor/job_executor.go b/internal/jobexecutor/job_executor.go index bd9d42fd..4d386dd3 100644 --- a/internal/jobexecutor/job_executor.go +++ b/internal/jobexecutor/job_executor.go @@ -8,6 +8,7 @@ import ( "fmt" "log/slog" "runtime" + "strings" "time" "github.com/tidwall/gjson" @@ -507,13 +508,13 @@ func captureStackTraceSkipFrames(skip int) string { n := runtime.Callers(skip, pcs) frames := runtime.CallersFrames(pcs[:n]) - var stackTrace string + var stackTraceSB strings.Builder for { frame, more := frames.Next() - stackTrace += fmt.Sprintf("%s\n\t%s:%d\n", frame.Function, frame.File, frame.Line) + stackTraceSB.WriteString(fmt.Sprintf("%s\n\t%s:%d\n", frame.Function, frame.File, frame.Line)) if !more { break } } - return stackTrace + return stackTraceSB.String() } diff --git a/internal/leadership/elector.go b/internal/leadership/elector.go index 97fbf0c2..69ddd75c 100644 --- a/internal/leadership/elector.go +++ b/internal/leadership/elector.go @@ -589,7 +589,6 @@ func attemptElectOrReelect(ctx context.Context, exec riverdriver.Executor, alrea var elected bool if alreadyElected { elected, err = execTx.LeaderAttemptReelect(ctx, params) - } else { elected, err = execTx.LeaderAttemptElect(ctx, params) } diff --git a/internal/maintenance/job_cleaner.go b/internal/maintenance/job_cleaner.go index e86e9bb2..f3216561 100644 --- a/internal/maintenance/job_cleaner.go +++ b/internal/maintenance/job_cleaner.go @@ -20,7 +20,7 @@ import ( "github.com/riverqueue/river/rivershared/util/timeutil" ) -// Test-only properties. +// JobCleanerTestSignals are internal signals used exclusively in tests. type JobCleanerTestSignals struct { DeletedBatch testsignal.TestSignal[struct{}] // notifies when runOnce finishes a pass } diff --git a/internal/maintenance/job_rescuer.go b/internal/maintenance/job_rescuer.go index 6d2dc957..c7353b36 100644 --- a/internal/maintenance/job_rescuer.go +++ b/internal/maintenance/job_rescuer.go @@ -29,7 +29,7 @@ const ( JobRescuerIntervalDefault = 30 * time.Second ) -// Test-only properties. +// JobRescuerTestSignals are internal signals used exclusively in tests. type JobRescuerTestSignals struct { FetchedBatch testsignal.TestSignal[struct{}] // notifies when runOnce has fetched a batch of jobs UpdatedBatch testsignal.TestSignal[struct{}] // notifies when runOnce has updated rescued jobs from a batch diff --git a/internal/maintenance/job_rescuer_test.go b/internal/maintenance/job_rescuer_test.go index a770e40d..d1f44ffd 100644 --- a/internal/maintenance/job_rescuer_test.go +++ b/internal/maintenance/job_rescuer_test.go @@ -148,8 +148,8 @@ func TestJobRescuer(t *testing.T) { // Marked as cancelled by query: cancelTime := time.Now().UTC().Format(time.RFC3339Nano) - stuckToCancelJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at": %q}`, cancelTime)), MaxAttempts: ptrutil.Ptr(5)}) - stuckToCancelJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(1 * time.Minute)), Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at": %q}`, cancelTime)), MaxAttempts: ptrutil.Ptr(5)}) // won't be rescued + stuckToCancelJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), Metadata: fmt.Appendf(nil, `{"cancel_attempted_at": %q}`, cancelTime), MaxAttempts: ptrutil.Ptr(5)}) + stuckToCancelJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(1 * time.Minute)), Metadata: fmt.Appendf(nil, `{"cancel_attempted_at": %q}`, cancelTime), MaxAttempts: ptrutil.Ptr(5)}) // won't be rescued // these aren't touched because they're in ineligible states notRunningJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), FinalizedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), State: ptrutil.Ptr(rivertype.JobStateCompleted), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)}) diff --git a/internal/maintenance/job_scheduler.go b/internal/maintenance/job_scheduler.go index 56d1b8bd..00adcee4 100644 --- a/internal/maintenance/job_scheduler.go +++ b/internal/maintenance/job_scheduler.go @@ -25,7 +25,7 @@ const ( JobSchedulerIntervalDefault = 5 * time.Second ) -// Test-only properties. +// JobSchedulerTestSignals are internal signals used exclusively in tests. type JobSchedulerTestSignals struct { NotifiedQueues testsignal.TestSignal[[]string] // notifies when queues are sent an insert notification ScheduledBatch testsignal.TestSignal[struct{}] // notifies when runOnce finishes a pass @@ -36,7 +36,7 @@ func (ts *JobSchedulerTestSignals) Init(tb testutil.TestingTB) { ts.ScheduledBatch.Init(tb) } -// NotifyInsert is a function to call to emit notifications for queues where +// NotifyInsertFunc is a function to call to emit notifications for queues where // jobs were scheduled. type NotifyInsertFunc func(ctx context.Context, execTx riverdriver.ExecutorTx, queues []string) error diff --git a/internal/maintenance/periodic_job_enqueuer.go b/internal/maintenance/periodic_job_enqueuer.go index 5c56b5d7..8ea0c042 100644 --- a/internal/maintenance/periodic_job_enqueuer.go +++ b/internal/maintenance/periodic_job_enqueuer.go @@ -29,7 +29,7 @@ import ( // signal that there's no job to insert at this time. var ErrNoJobToInsert = errors.New("a nil job was returned, nothing to insert") -// Test-only properties. +// PeriodicJobEnqueuerTestSignals are internal signals used exclusively in tests. type PeriodicJobEnqueuerTestSignals struct { EnteredLoop testsignal.TestSignal[struct{}] // notifies when the enqueuer finishes start up and enters its initial run loop InsertedJobs testsignal.TestSignal[struct{}] // notifies when a batch of jobs is inserted @@ -263,7 +263,7 @@ func (s *PeriodicJobEnqueuer) Remove(periodicJobHandle rivertype.PeriodicJobHand } } -// Remove removes a periodic job from the enqueuer by ID. Its current target run +// RemoveByID removes a periodic job from the enqueuer by ID. Its current target run // time and all future runs are cancelled. func (s *PeriodicJobEnqueuer) RemoveByID(id string) bool { s.mu.Lock() @@ -291,8 +291,8 @@ func (s *PeriodicJobEnqueuer) RemoveMany(periodicJobHandles []rivertype.Periodic } } -// RemoveMany removes many periodic jobs from the enqueuer by ID. Their current -// target run time and all future runs are cancelled. +// RemoveManyByID removes many periodic jobs from the enqueuer by ID. Their +// current target run time and all future runs are cancelled. func (s *PeriodicJobEnqueuer) RemoveManyByID(ids []string) { s.mu.Lock() defer s.mu.Unlock() diff --git a/internal/maintenance/queue_cleaner.go b/internal/maintenance/queue_cleaner.go index 3eb1d302..f64e97db 100644 --- a/internal/maintenance/queue_cleaner.go +++ b/internal/maintenance/queue_cleaner.go @@ -26,7 +26,7 @@ const ( QueueRetentionPeriodDefault = 24 * time.Hour ) -// Test-only properties. +// QueueCleanerTestSignals are internal signals used exclusively in tests. type QueueCleanerTestSignals struct { DeletedBatch testsignal.TestSignal[struct{}] // notifies when runOnce finishes a pass } diff --git a/internal/maintenance/reindexer.go b/internal/maintenance/reindexer.go index 0728bdda..7ff4bde9 100644 --- a/internal/maintenance/reindexer.go +++ b/internal/maintenance/reindexer.go @@ -18,6 +18,8 @@ import ( const ( ReindexerIntervalDefault = 24 * time.Hour + // ReindexerTimeoutDefault is the default timeout of the reindexer. + // // We've had user reports of builds taking 45 seconds on large tables, so // set a timeout of that plus a little margin. Use of `CONCURRENTLY` should // prevent index operations that run a little long from impacting work from @@ -37,7 +39,7 @@ var defaultIndexNames = []string{ //nolint:gochecknoglobals "river_job_unique_idx", } -// Test-only properties. +// ReindexerTestSignals are internal signals used exclusively in tests. type ReindexerTestSignals struct { Reindexed testsignal.TestSignal[struct{}] // notifies when a run finishes executing reindexes for all indexes } diff --git a/internal/riverinternaltest/sharedtx/shared_tx.go b/internal/riverinternaltest/sharedtx/shared_tx.go index 74e14ec8..ef5c5588 100644 --- a/internal/riverinternaltest/sharedtx/shared_tx.go +++ b/internal/riverinternaltest/sharedtx/shared_tx.go @@ -95,8 +95,11 @@ func (e *SharedTx) QueryRow(ctx context.Context, query string, args ...any) pgx. return &SharedTxRow{sharedTxDerivative{sharedTx: e}, row} } +// // These are all implemented so that a SharedTx can be used as a pgx.Tx, but are // all non-functional. +// + func (e *SharedTx) Conn() *pgx.Conn { panic("not implemented") } func (e *SharedTx) Commit(ctx context.Context) error { panic("not implemented") } func (e *SharedTx) LargeObjects() pgx.LargeObjects { panic("not implemented") } @@ -163,17 +166,20 @@ func (r *SharedTxRows) Close() { r.innerRows.Close() } +// // All of these are simple pass throughs. +// + func (r *SharedTxRows) CommandTag() pgconn.CommandTag { return r.innerRows.CommandTag() } func (r *SharedTxRows) Conn() *pgx.Conn { return nil } func (r *SharedTxRows) Err() error { return r.innerRows.Err() } func (r *SharedTxRows) FieldDescriptions() []pgconn.FieldDescription { return r.innerRows.FieldDescriptions() } -func (r *SharedTxRows) Next() bool { return r.innerRows.Next() } -func (r *SharedTxRows) RawValues() [][]byte { return r.innerRows.RawValues() } -func (r *SharedTxRows) Scan(dest ...any) error { return r.innerRows.Scan(dest...) } -func (r *SharedTxRows) Values() ([]interface{}, error) { return r.innerRows.Values() } +func (r *SharedTxRows) Next() bool { return r.innerRows.Next() } +func (r *SharedTxRows) RawValues() [][]byte { return r.innerRows.RawValues() } +func (r *SharedTxRows) Scan(dest ...any) error { return r.innerRows.Scan(dest...) } +func (r *SharedTxRows) Values() ([]any, error) { return r.innerRows.Values() } // SharedSubTx wraps a pgx.Tx such that it unlocks SharedTx when it commits or // rolls back. diff --git a/producer.go b/producer.go index 37837480..951c1b9d 100644 --- a/producer.go +++ b/producer.go @@ -266,8 +266,8 @@ func (p *producer) Stop() { p.Logger.Debug(p.Name+": Stop returned", slog.String("queue", p.config.Queue), slog.Int64("id", p.id.Load())) } -// Start starts the producer. It backgrounds a goroutine which is stopped when -// context is cancelled or Stop is invoked. +// StartWorkContext starts the producer. It backgrounds a goroutine which is +// stopped when context is cancelled or Stop is invoked. // // When fetchCtx is cancelled, no more jobs will be fetched; however, if a fetch // is already in progress, It will be allowed to complete and run any fetched diff --git a/retry_policy.go b/retry_policy.go index a1cb35c8..ab66775e 100644 --- a/retry_policy.go +++ b/retry_policy.go @@ -25,7 +25,7 @@ type ClientRetryPolicy interface { NextRetry(job *rivertype.JobRow) time.Time } -// River's default retry policy. +// DefaultClientRetryPolicy is River's default retry policy. type DefaultClientRetryPolicy struct { timeNowFunc func() time.Time } diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index df31d6c5..4ecf3a90 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -1108,7 +1108,7 @@ type templateReplaceWrapper struct { replacer *sqlctemplate.Replacer } -func (w templateReplaceWrapper) ExecContext(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) { +func (w templateReplaceWrapper) ExecContext(ctx context.Context, sql string, args ...any) (sql.Result, error) { sql, args = w.replacer.Run(ctx, argPlaceholder, sql, args) return w.dbtx.ExecContext(ctx, sql, args...) } @@ -1118,12 +1118,12 @@ func (w templateReplaceWrapper) PrepareContext(ctx context.Context, sql string) return w.dbtx.PrepareContext(ctx, sql) } -func (w templateReplaceWrapper) QueryContext(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) { +func (w templateReplaceWrapper) QueryContext(ctx context.Context, sql string, args ...any) (*sql.Rows, error) { sql, args = w.replacer.Run(ctx, argPlaceholder, sql, args) return w.dbtx.QueryContext(ctx, sql, args...) } -func (w templateReplaceWrapper) QueryRowContext(ctx context.Context, sql string, args ...interface{}) *sql.Row { +func (w templateReplaceWrapper) QueryRowContext(ctx context.Context, sql string, args ...any) *sql.Row { sql, args = w.replacer.Run(ctx, argPlaceholder, sql, args) return w.dbtx.QueryRowContext(ctx, sql, args...) } diff --git a/riverdriver/riverdrivertest/riverdrivertest.go b/riverdriver/riverdrivertest/riverdrivertest.go index c9965432..27aade13 100644 --- a/riverdriver/riverdrivertest/riverdrivertest.go +++ b/riverdriver/riverdrivertest/riverdrivertest.go @@ -1607,7 +1607,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, ) insertParams := make([]*riverdriver.JobInsertFastParams, 10) - for i := 0; i < len(insertParams); i++ { + for i := range insertParams { insertParams[i] = &riverdriver.JobInsertFastParams{ ID: ptrutil.Ptr(idStart + int64(i)), CreatedAt: ptrutil.Ptr(now.Add(time.Duration(i) * 5 * time.Second)), @@ -1668,7 +1668,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, exec, _ := setup(ctx, t) insertParams := make([]*riverdriver.JobInsertFastParams, 10) - for i := 0; i < len(insertParams); i++ { + for i := range insertParams { insertParams[i] = &riverdriver.JobInsertFastParams{ EncodedArgs: []byte(`{"encoded": "args"}`), Kind: "test_kind", @@ -1826,7 +1826,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, now := time.Now().UTC().Add(-1 * time.Minute) insertParams := make([]*riverdriver.JobInsertFastParams, 10) - for i := 0; i < len(insertParams); i++ { + for i := range insertParams { insertParams[i] = &riverdriver.JobInsertFastParams{ CreatedAt: ptrutil.Ptr(now.Add(time.Duration(i) * 5 * time.Second)), EncodedArgs: []byte(`{"encoded": "args"}`), @@ -1881,7 +1881,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, exec, _ := setup(ctx, t) insertParams := make([]*riverdriver.JobInsertFastParams, 10) - for i := 0; i < len(insertParams); i++ { + for i := range insertParams { insertParams[i] = &riverdriver.JobInsertFastParams{ EncodedArgs: []byte(`{"encoded": "args"}`), Kind: "test_kind", @@ -2011,7 +2011,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, exec, _ := setup(ctx, t) insertParams := make([]*riverdriver.JobInsertFastParams, 10) - for i := 0; i < len(insertParams); i++ { + for i := range insertParams { insertParams[i] = &riverdriver.JobInsertFastParams{ CreatedAt: nil, // explicit nil EncodedArgs: []byte(`{"encoded": "args"}`), @@ -2050,7 +2050,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, exec, _ := setup(ctx, t) insertParams := make([]*riverdriver.JobInsertFastParams, 10) - for i := 0; i < len(insertParams); i++ { + for i := range insertParams { insertParams[i] = &riverdriver.JobInsertFastParams{ EncodedArgs: []byte(`{"encoded": "args"}`), Kind: "test_kind", @@ -2097,7 +2097,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, now := time.Now().UTC().Add(-1 * time.Minute) insertParams := make([]*riverdriver.JobInsertFastParams, 10) - for i := 0; i < len(insertParams); i++ { + for i := range insertParams { insertParams[i] = &riverdriver.JobInsertFastParams{ CreatedAt: ptrutil.Ptr(now.Add(time.Duration(i) * 5 * time.Second)), EncodedArgs: []byte(`{"encoded": "args"}`), @@ -3143,7 +3143,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, now := time.Now().UTC() job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ - Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at":"%s"}`, time.Now().UTC().Format(time.RFC3339))), + Metadata: fmt.Appendf(nil, `{"cancel_attempted_at":"%s"}`, time.Now().UTC().Format(time.RFC3339)), State: ptrutil.Ptr(rivertype.JobStateRunning), ScheduledAt: ptrutil.Ptr(now.Add(-10 * time.Second)), }) diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 9490b486..c9650d6a 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -1157,17 +1157,17 @@ func (w templateReplaceWrapper) Begin(ctx context.Context) (pgx.Tx, error) { return w.dbtx.Begin(ctx) } -func (w templateReplaceWrapper) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error) { +func (w templateReplaceWrapper) Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) { sql, args = w.replacer.Run(ctx, argPlaceholder, sql, args) return w.dbtx.Exec(ctx, sql, args...) } -func (w templateReplaceWrapper) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) { +func (w templateReplaceWrapper) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) { sql, args = w.replacer.Run(ctx, argPlaceholder, sql, args) return w.dbtx.Query(ctx, sql, args...) } -func (w templateReplaceWrapper) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row { +func (w templateReplaceWrapper) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row { sql, args = w.replacer.Run(ctx, argPlaceholder, sql, args) return w.dbtx.QueryRow(ctx, sql, args...) } diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index ccfab633..7e6af4bc 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -1536,7 +1536,7 @@ type templateReplaceWrapper struct { replacer *sqlctemplate.Replacer } -func (w templateReplaceWrapper) ExecContext(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) { +func (w templateReplaceWrapper) ExecContext(ctx context.Context, sql string, args ...any) (sql.Result, error) { sql, args = w.replacer.Run(ctx, argPlaceholder, sql, args) return w.dbtx.ExecContext(ctx, sql, args...) } @@ -1546,12 +1546,12 @@ func (w templateReplaceWrapper) PrepareContext(ctx context.Context, sql string) return w.dbtx.PrepareContext(ctx, sql) } -func (w templateReplaceWrapper) QueryContext(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) { +func (w templateReplaceWrapper) QueryContext(ctx context.Context, sql string, args ...any) (*sql.Rows, error) { sql, args = w.replacer.Run(ctx, argPlaceholder, sql, args) return w.dbtx.QueryContext(ctx, sql, args...) } -func (w templateReplaceWrapper) QueryRowContext(ctx context.Context, sql string, args ...interface{}) *sql.Row { +func (w templateReplaceWrapper) QueryRowContext(ctx context.Context, sql string, args ...any) *sql.Row { sql, args = w.replacer.Run(ctx, argPlaceholder, sql, args) return w.dbtx.QueryRowContext(ctx, sql, args...) } diff --git a/rivermigrate/river_migrate.go b/rivermigrate/river_migrate.go index e248ac2c..1c28ed7e 100644 --- a/rivermigrate/river_migrate.go +++ b/rivermigrate/river_migrate.go @@ -177,8 +177,8 @@ func (m *Migrator[TTx]) ExistingVersions(ctx context.Context) ([]Migration, erro return versions, nil } -// ExistingVersions gets the existing set of versions that have been migrated in -// the database, ordered by version. +// ExistingVersionsTx gets the existing set of versions that have been migrated +// in the database, ordered by version. // // This variant checks for existing versions in a transaction. func (m *Migrator[TTx]) ExistingVersionsTx(ctx context.Context, tx TTx) ([]Migration, error) { @@ -367,7 +367,7 @@ func (m *Migrator[TTx]) Validate(ctx context.Context) (*ValidateResult, error) { }) } -// Validate validates the current state of migrations, returning an unsuccessful +// ValidateTx validates the current state of migrations, returning an unsuccessful // validation and usable message in case there are migrations that haven't yet // been applied. // diff --git a/rivershared/riverpilot/pilot.go b/rivershared/riverpilot/pilot.go index ee298e5c..acc0cbed 100644 --- a/rivershared/riverpilot/pilot.go +++ b/rivershared/riverpilot/pilot.go @@ -112,6 +112,8 @@ type PilotPeriodicJob interface { PeriodicJobUpsertMany(ctx context.Context, exec riverdriver.Executor, params *PeriodicJobUpsertManyParams) ([]*PeriodicJob, error) } +// PeriodicJob represents a durable periodic job. +// // TODO: Get rid of this in favor of rivertype.PeriodicJob the next time we're // making River <-> River Pro API contract changes. type PeriodicJob struct { diff --git a/rivershared/riversharedmaintenance/river_shared_maintenance.go b/rivershared/riversharedmaintenance/river_shared_maintenance.go index 1ee8243e..82487300 100644 --- a/rivershared/riversharedmaintenance/river_shared_maintenance.go +++ b/rivershared/riversharedmaintenance/river_shared_maintenance.go @@ -11,12 +11,14 @@ import ( "github.com/riverqueue/river/rivershared/util/serviceutil" ) +// Maintainers will sleep a brief period of time between batches to give the +// database some breathing room. const ( - // Maintainers will sleep a brief period of time between batches to give the - // database some breathing room. BatchBackoffMax = 1 * time.Second BatchBackoffMin = 50 * time.Millisecond +) +const ( LogPrefixRanSuccessfully = ": Ran successfully" LogPrefixRunLoopStarted = ": Run loop started" LogPrefixRunLoopStopped = ": Run loop stopped" diff --git a/rivershared/riversharedtest/riversharedtest.go b/rivershared/riversharedtest/riversharedtest.go index 88bb6927..25bded08 100644 --- a/rivershared/riversharedtest/riversharedtest.go +++ b/rivershared/riversharedtest/riversharedtest.go @@ -204,8 +204,9 @@ func Logger(tb testing.TB) *slog.Logger { return slogtest.NewLogger(tb, nil) } -// Logger returns a logger suitable for use in tests which outputs only at warn -// or above. Useful in tests where particularly noisy log output is expected. +// LoggerWarn returns a logger suitable for use in tests which outputs only at +// warn or above. Useful in tests where particularly noisy log output is +// expected. func LoggerWarn(tb testutil.TestingTB) *slog.Logger { tb.Helper() return slogtest.NewLogger(tb, &slog.HandlerOptions{Level: slog.LevelWarn}) diff --git a/rivershared/sqlctemplate/sqlc_template.go b/rivershared/sqlctemplate/sqlc_template.go index 716081cd..88c02edb 100644 --- a/rivershared/sqlctemplate/sqlc_template.go +++ b/rivershared/sqlctemplate/sqlc_template.go @@ -43,6 +43,7 @@ import ( "context" "errors" "fmt" + "maps" "regexp" "slices" "strconv" @@ -249,12 +250,8 @@ func (r *Replacer) RunSafely(ctx context.Context, argPlaceholder, sql string, ar // merged, with the new params taking precedent. func WithReplacements(ctx context.Context, replacements map[string]Replacement, namedArgs map[string]any) context.Context { if container, ok := ctx.Value(contextKey{}).(*contextContainer); ok { - for arg, val := range namedArgs { - container.NamedArgs[arg] = val - } - for template, replacement := range replacements { - container.Replacements[template] = replacement - } + maps.Copy(container.NamedArgs, namedArgs) + maps.Copy(container.Replacements, replacements) return ctx } diff --git a/rivershared/startstop/start_stop.go b/rivershared/startstop/start_stop.go index 59e26922..b03cbf24 100644 --- a/rivershared/startstop/start_stop.go +++ b/rivershared/startstop/start_stop.go @@ -231,8 +231,8 @@ func (s *BaseStartStop) Stopped() <-chan struct{} { return s.stopped } -// StoppedWithoutLock returns a channel that can be waited on for the service to -// be stopped. +// StoppedUnsafe returns a channel that can be waited on for the service to be +// stopped. // // Unlike Stopped, this returns the struct's internal channel directly without // preallocation and without taking a lock on the mutex (making it safe to call diff --git a/rivershared/startstoptest/startstoptest.go b/rivershared/startstoptest/startstoptest.go index 1c11fe4d..ae08bcbf 100644 --- a/rivershared/startstoptest/startstoptest.go +++ b/rivershared/startstoptest/startstoptest.go @@ -80,7 +80,7 @@ func StressErr(ctx context.Context, tb testingT, svc startstop.Service, allowedS // Minimal interface for *testing.B/*testing.T that lets us test a failure // condition for our test helpers above. type testingT interface { - Errorf(format string, args ...interface{}) + Errorf(format string, args ...any) FailNow() Helper() } diff --git a/rivershared/startstoptest/startstoptest_test.go b/rivershared/startstoptest/startstoptest_test.go index 3515bd7f..d38d0f7a 100644 --- a/rivershared/startstoptest/startstoptest_test.go +++ b/rivershared/startstoptest/startstoptest_test.go @@ -76,6 +76,6 @@ func newMockTestingT(tb testing.TB) *mockTestingT { return &mockTestingT{tb: tb} } -func (t *mockTestingT) Errorf(format string, args ...interface{}) {} -func (t *mockTestingT) FailNow() { t.failed.Store(true) } -func (t *mockTestingT) Helper() { t.tb.Helper() } +func (t *mockTestingT) Errorf(format string, args ...any) {} +func (t *mockTestingT) FailNow() { t.failed.Store(true) } +func (t *mockTestingT) Helper() { t.tb.Helper() } diff --git a/rivershared/util/randutil/rand_util_test.go b/rivershared/util/randutil/rand_util_test.go index ce72d18c..316ac20f 100644 --- a/rivershared/util/randutil/rand_util_test.go +++ b/rivershared/util/randutil/rand_util_test.go @@ -54,7 +54,7 @@ func TestIntBetween(t *testing.T) { // func BenchmarkRandV2(b *testing.B) { - for range b.N { + for b.Loop() { _ = rand.IntN(1984) } } @@ -68,7 +68,7 @@ func BenchmarkCryptoSource(b *testing.B) { return nBig.Int64() } - for range b.N { + for b.Loop() { _ = intN(1984) } } diff --git a/rivershared/util/serviceutil/service_util.go b/rivershared/util/serviceutil/service_util.go index dde3bbe6..d3702044 100644 --- a/rivershared/util/serviceutil/service_util.go +++ b/rivershared/util/serviceutil/service_util.go @@ -23,7 +23,7 @@ func CancellableSleep(ctx context.Context, sleepDuration time.Duration) { } } -// CancellableSleep sleeps for the given duration, but returns early if context +// CancellableSleepC sleeps for the given duration, but returns early if context // has been cancelled. // // This variant returns a channel that should be waited on and which will be @@ -62,7 +62,7 @@ const MaxAttemptsBeforeResetDefault = 7 // - 8s // - 16s // - 32s -// - 64s +// - 64s. func ExponentialBackoff(attempt, maxAttemptsBeforeReset int) time.Duration { retrySeconds := exponentialBackoffSecondsWithoutJitter(attempt, maxAttemptsBeforeReset) diff --git a/rivershared/util/testutil/job_args_reflect_kind.go b/rivershared/util/testutil/job_args_reflect_kind.go index c77a74d9..bd8eb9e3 100644 --- a/rivershared/util/testutil/job_args_reflect_kind.go +++ b/rivershared/util/testutil/job_args_reflect_kind.go @@ -28,4 +28,6 @@ import "reflect" // so the type doesn't pollute the global namespace. type JobArgsReflectKind[TKind any] struct{} -func (a JobArgsReflectKind[TKind]) Kind() string { return reflect.TypeOf(a).Name() } +func (a JobArgsReflectKind[TKind]) Kind() string { + return reflect.TypeFor[JobArgsReflectKind[TKind]]().Name() +} diff --git a/rivershared/util/testutil/test_util.go b/rivershared/util/testutil/test_util.go index a2d4c353..99f7d3b9 100644 --- a/rivershared/util/testutil/test_util.go +++ b/rivershared/util/testutil/test_util.go @@ -106,7 +106,7 @@ func (t *MockT) LogOutput() string { func (t *MockT) Name() string { return "MockT" } -// TestingT is an interface wrapper around *testing.T that's implemented by all +// TestingTB is an interface wrapper around *testing.T that's implemented by all // of *testing.T, *testing.F, and *testing.B. // // It's used internally to verify that River's test assertions are working as diff --git a/rivershared/util/valutil/val_util.go b/rivershared/util/valutil/val_util.go index a8ef572c..aebeb479 100644 --- a/rivershared/util/valutil/val_util.go +++ b/rivershared/util/valutil/val_util.go @@ -13,8 +13,8 @@ func ValOrDefault[T comparable](val, defaultVal T) T { return defaultVal } -// ValOrDefault returns the given value if it's non-zero, and otherwise invokes -// defaultFunc to produce a default value. +// ValOrDefaultFunc returns the given value if it's non-zero, and otherwise +// invokes defaultFunc to produce a default value. func ValOrDefaultFunc[T comparable](val T, defaultFunc func() T) T { var zero T if val != zero { diff --git a/rivertest/rivertest.go b/rivertest/rivertest.go index 33f9d864..27a2f921 100644 --- a/rivertest/rivertest.go +++ b/rivertest/rivertest.go @@ -31,8 +31,8 @@ type testingT interface { Logf(format string, args ...any) } -// Options for RequireInserted functions including expectations for various -// queuing properties that stem from InsertOpts. +// RequireInsertedOpts are options for RequireInserted functions including +// expectations for various queuing properties that stem from InsertOpts. // // Multiple properties set on this struct increase the specificity on a job to // match, acting like an AND condition on each. @@ -111,7 +111,7 @@ func requireInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobAr t.Helper() actualArgs, err := requireInsertedErr[TDriver](ctx, t, driver.GetExecutor(), expectedJob, opts) if err != nil { - failure(t, "Internal failure: %s", err) + failuref(t, "Internal failure: %s", err) } return actualArgs } @@ -147,7 +147,7 @@ func requireInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.Job var driver TDriver actualArgs, err := requireInsertedErr[TDriver](ctx, t, driver.UnwrapExecutor(tx), expectedJob, opts) if err != nil { - failure(t, "Internal failure: %s", err) + failuref(t, "Internal failure: %s", err) } return actualArgs } @@ -170,12 +170,12 @@ func requireInsertedErr[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.Jo } if len(jobRows) < 1 { - failure(t, "No jobs found with kind: %s", expectedJob.Kind()) + failuref(t, "No jobs found with kind: %s", expectedJob.Kind()) return nil, nil //nolint:nilnil } if len(jobRows) > 1 { - failure(t, "More than one job found with kind: %s (you might want RequireManyInserted instead)", expectedJob.Kind()) + failuref(t, "More than one job found with kind: %s (you might want RequireManyInserted instead)", expectedJob.Kind()) return nil, nil //nolint:nilnil } @@ -220,7 +220,7 @@ func requireNotInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.Jo t.Helper() err := requireNotInsertedErr[TDriver](ctx, t, driver.GetExecutor(), expectedJob, opts) if err != nil { - failure(t, "Internal failure: %s", err) + failuref(t, "Internal failure: %s", err) } } @@ -255,7 +255,7 @@ func requireNotInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river. var driver TDriver err := requireNotInsertedErr[TDriver](ctx, t, driver.UnwrapExecutor(tx), expectedJob, opts) if err != nil { - failure(t, "Internal failure: %s", err) + failuref(t, "Internal failure: %s", err) } } @@ -281,7 +281,7 @@ func requireNotInsertedErr[TDriver riverdriver.Driver[TTx], TTx any, TArgs river } if len(jobRows) > 0 && opts == nil { - failure(t, "%d jobs found with kind, but expected to find none: %s", len(jobRows), expectedJob.Kind()) + failuref(t, "%d jobs found with kind, but expected to find none: %s", len(jobRows), expectedJob.Kind()) return nil } @@ -350,7 +350,7 @@ func requireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.C t.Helper() actualArgs, err := requireManyInsertedErr[TDriver](ctx, t, driver.GetExecutor(), expectedJobs) if err != nil { - failure(t, "Internal failure: %s", err) + failuref(t, "Internal failure: %s", err) } return actualArgs } @@ -396,7 +396,7 @@ func requireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context var driver TDriver actualArgs, err := requireManyInsertedErr[TDriver](ctx, t, driver.UnwrapExecutor(tx), expectedJobs) if err != nil { - failure(t, "Internal failure: %s", err) + failuref(t, "Internal failure: %s", err) } return actualArgs } @@ -442,7 +442,7 @@ func requireManyInsertedErr[TDriver riverdriver.Driver[TTx], TTx any](ctx contex actualArgsKinds := sliceutil.Map(jobRows, func(j *rivertype.JobRow) string { return j.Kind }) if !slices.Equal(expectedArgsKinds, actualArgsKinds) { - failure(t, "Inserted jobs didn't match expectation; expected: %+v, actual: %+v", + failuref(t, "Inserted jobs didn't match expectation; expected: %+v, actual: %+v", expectedArgsKinds, actualArgsKinds) return nil, nil } @@ -590,13 +590,13 @@ func compareJobToInsertOpts(t testingT, jobRow *rivertype.JobRow, expectedOpts * // not match all requested conditions, so the RequireNotInserted will not // fail). If all properties matched, then like with RequireInserted, we'll // have built up failures and are ready to emit a final failure message. - failure(t, "Job with kind '%s'%s %s", jobRow.Kind, positionStr(), strings.Join(failures, ", ")) + failuref(t, "Job with kind '%s'%s %s", jobRow.Kind, positionStr(), strings.Join(failures, ", ")) return false } -// failure takes a printf-style directive and is a shortcut for failing an +// failuref takes a printf-style directive and is a shortcut for failing an // assertion. -func failure(t testingT, format string, a ...any) { +func failuref(t testingT, format string, a ...any) { t.Helper() t.Log(failureString(format, a...)) t.FailNow() diff --git a/subscription_manager.go b/subscription_manager.go index d1a01637..3afda983 100644 --- a/subscription_manager.go +++ b/subscription_manager.go @@ -207,7 +207,8 @@ func (sm *subscriptionManager) distributeQueueEvent(event *Event) { } } -// Special internal variant that lets us inject an overridden size. +// SubscribeConfig is a special internal Subscribe variant that lets us inject +// an overridden size. func (sm *subscriptionManager) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, func()) { if config.ChanSize < 0 { panic("SubscribeConfig.ChanSize must be greater or equal to 1")