Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ jobs:
name: lint
runs-on: ubuntu-latest
env:
GOLANGCI_LINT_VERSION: v2.4.0
GOLANGCI_LINT_VERSION: v2.7.2
permissions:
contents: read
# allow read access to pull request. Use with `only-new-issues` option.
Expand Down
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ linters:
- ireturn # bans returning interfaces; questionable as is, but also buggy as hell; very, very annoying
- lll # restricts maximum line length; annoying
- nlreturn # requires a blank line before returns; annoying
- unqueryvet # bans all use of `SELECT *`; just ... sigh
- wsl # a bunch of style/whitespace stuff; annoying
- wsl_v5 # a second version of the first annoying wsl; how nice

Expand Down
18 changes: 10 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,8 @@ type SubscribeConfig struct {
Kinds []EventKind
}

// Special internal variant that lets us inject an overridden size.
// SubscribeConfig is a special internal variant of Subscribe that lets us
// inject an overridden channel size.
func (c *Client[TTx]) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, func()) {
if c.subscriptionManager == nil {
panic("created a subscription on a client that will never work jobs (Queues not configured)")
Expand Down Expand Up @@ -1446,7 +1447,7 @@ func (c *Client[TTx]) JobDelete(ctx context.Context, id int64) (*rivertype.JobRo
})
}

// JobDelete deletes the job with the given ID from the database, returning the
// JobDeleteTx deletes the job with the given ID from the database, returning the
// deleted row if it was deleted. Jobs in the running state are not deleted,
// instead returning rivertype.ErrJobRunning. This variant lets a caller retry a
// job atomically alongside other database changes. A deleted job isn't deleted
Expand Down Expand Up @@ -2029,10 +2030,10 @@ func (c *Client[TTx]) InsertManyFast(ctx context.Context, params []InsertManyPar
return len(res), nil
}

// InsertManyTx inserts many jobs at once using Postgres' `COPY FROM` mechanism,
// making the operation quite fast and memory efficient. Each job is inserted as
// an InsertManyParams tuple, which takes job args along with an optional set of
// insert options, which override insert options provided by an
// InsertManyFastTx inserts many jobs at once using Postgres' `COPY FROM`
// mechanism, making the operation quite fast and memory efficient. Each job is
// inserted as an InsertManyParams tuple, which takes job args along with an
// optional set of insert options, which override insert options provided by an
// JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults.
// The provided context is used for the underlying Postgres inserts and can be
// used to cancel the operation or apply a timeout.
Expand Down Expand Up @@ -2443,7 +2444,8 @@ func (c *Client[TTx]) PeriodicJobs() *PeriodicJobBundle {
return c.periodicJobs
}

// Driver exposes the underlying pilot used by the client.
// Pilot returns the pilot in use by the pilot. If not configured, this is often
// simply StandardPilot.
//
// API is not stable. DO NOT USE.
func (c *Client[TTx]) Pilot() riverpilot.Pilot {
Expand Down Expand Up @@ -2649,7 +2651,7 @@ func (c *Client[TTx]) QueueResume(ctx context.Context, name string, opts *QueueP
return nil
}

// QueueResume resumes the queue with the given name. If the queue was
// QueueResumeTx resumes the queue with the given name. If the queue was
// previously paused, any clients configured to work that queue will resume
// fetching additional jobs. To resume all queues at once, use the special queue
// name "*".
Expand Down
6 changes: 3 additions & 3 deletions client_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func ClientFromContext[TTx any](ctx context.Context) *Client[TTx] {
return client
}

// ClientFromContext returns the Client from the context. This function can
// only be used within a Worker's Work() method because that is the only place
// River sets the Client on the context.
// ClientFromContextSafely returns the Client from the context. This function
// can only be used within a Worker's Work() method because that is the only
// place River sets the Client on the context.
//
// It returns an error if the context does not contain a Client, which will
// never happen from the context provided to a Worker's Work() method.
Expand Down
4 changes: 3 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7858,7 +7858,9 @@ func (a JobArgsStaticKind) Kind() string {

type JobArgsReflectKind[TKind any] struct{}

func (a JobArgsReflectKind[TKind]) Kind() string { return reflect.TypeOf(a).Name() }
func (a JobArgsReflectKind[TKind]) Kind() string {
return reflect.TypeFor[JobArgsReflectKind[TKind]]().Name()
}

func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Parallel()
Expand Down
4 changes: 2 additions & 2 deletions internal/dbunique/unique_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func getSortedUniqueFields(typ reflect.Type, path []string, typesSeen map[reflec
// Check for `river:"unique"` tag, possibly among other comma-separated values
var hasUniqueTag bool
if riverTag, ok := field.Tag.Lookup("river"); ok {
tags := strings.Split(riverTag, ",")
for _, tag := range tags {
tags := strings.SplitSeq(riverTag, ",")
for tag := range tags {
if strings.TrimSpace(tag) == "unique" {
hasUniqueTag = true
}
Expand Down
7 changes: 4 additions & 3 deletions internal/jobexecutor/job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"log/slog"
"runtime"
"strings"
"time"

"github.com/tidwall/gjson"
Expand Down Expand Up @@ -507,13 +508,13 @@ func captureStackTraceSkipFrames(skip int) string {
n := runtime.Callers(skip, pcs)
frames := runtime.CallersFrames(pcs[:n])

var stackTrace string
var stackTraceSB strings.Builder
for {
frame, more := frames.Next()
stackTrace += fmt.Sprintf("%s\n\t%s:%d\n", frame.Function, frame.File, frame.Line)
stackTraceSB.WriteString(fmt.Sprintf("%s\n\t%s:%d\n", frame.Function, frame.File, frame.Line))
if !more {
break
}
}
return stackTrace
return stackTraceSB.String()
}
1 change: 0 additions & 1 deletion internal/leadership/elector.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,6 @@ func attemptElectOrReelect(ctx context.Context, exec riverdriver.Executor, alrea
var elected bool
if alreadyElected {
elected, err = execTx.LeaderAttemptReelect(ctx, params)

} else {
elected, err = execTx.LeaderAttemptElect(ctx, params)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/maintenance/job_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/riverqueue/river/rivershared/util/timeutil"
)

// Test-only properties.
// JobCleanerTestSignals are internal signals used exclusively in tests.
type JobCleanerTestSignals struct {
DeletedBatch testsignal.TestSignal[struct{}] // notifies when runOnce finishes a pass
}
Expand Down
2 changes: 1 addition & 1 deletion internal/maintenance/job_rescuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const (
JobRescuerIntervalDefault = 30 * time.Second
)

// Test-only properties.
// JobRescuerTestSignals are internal signals used exclusively in tests.
type JobRescuerTestSignals struct {
FetchedBatch testsignal.TestSignal[struct{}] // notifies when runOnce has fetched a batch of jobs
UpdatedBatch testsignal.TestSignal[struct{}] // notifies when runOnce has updated rescued jobs from a batch
Expand Down
4 changes: 2 additions & 2 deletions internal/maintenance/job_rescuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ func TestJobRescuer(t *testing.T) {

// Marked as cancelled by query:
cancelTime := time.Now().UTC().Format(time.RFC3339Nano)
stuckToCancelJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at": %q}`, cancelTime)), MaxAttempts: ptrutil.Ptr(5)})
stuckToCancelJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(1 * time.Minute)), Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at": %q}`, cancelTime)), MaxAttempts: ptrutil.Ptr(5)}) // won't be rescued
stuckToCancelJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), Metadata: fmt.Appendf(nil, `{"cancel_attempted_at": %q}`, cancelTime), MaxAttempts: ptrutil.Ptr(5)})
stuckToCancelJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(1 * time.Minute)), Metadata: fmt.Appendf(nil, `{"cancel_attempted_at": %q}`, cancelTime), MaxAttempts: ptrutil.Ptr(5)}) // won't be rescued

// these aren't touched because they're in ineligible states
notRunningJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), FinalizedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), State: ptrutil.Ptr(rivertype.JobStateCompleted), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)})
Expand Down
4 changes: 2 additions & 2 deletions internal/maintenance/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
JobSchedulerIntervalDefault = 5 * time.Second
)

// Test-only properties.
// JobSchedulerTestSignals are internal signals used exclusively in tests.
type JobSchedulerTestSignals struct {
NotifiedQueues testsignal.TestSignal[[]string] // notifies when queues are sent an insert notification
ScheduledBatch testsignal.TestSignal[struct{}] // notifies when runOnce finishes a pass
Expand All @@ -36,7 +36,7 @@ func (ts *JobSchedulerTestSignals) Init(tb testutil.TestingTB) {
ts.ScheduledBatch.Init(tb)
}

// NotifyInsert is a function to call to emit notifications for queues where
// NotifyInsertFunc is a function to call to emit notifications for queues where
// jobs were scheduled.
type NotifyInsertFunc func(ctx context.Context, execTx riverdriver.ExecutorTx, queues []string) error

Expand Down
8 changes: 4 additions & 4 deletions internal/maintenance/periodic_job_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// signal that there's no job to insert at this time.
var ErrNoJobToInsert = errors.New("a nil job was returned, nothing to insert")

// Test-only properties.
// PeriodicJobEnqueuerTestSignals are internal signals used exclusively in tests.
type PeriodicJobEnqueuerTestSignals struct {
EnteredLoop testsignal.TestSignal[struct{}] // notifies when the enqueuer finishes start up and enters its initial run loop
InsertedJobs testsignal.TestSignal[struct{}] // notifies when a batch of jobs is inserted
Expand Down Expand Up @@ -263,7 +263,7 @@ func (s *PeriodicJobEnqueuer) Remove(periodicJobHandle rivertype.PeriodicJobHand
}
}

// Remove removes a periodic job from the enqueuer by ID. Its current target run
// RemoveByID removes a periodic job from the enqueuer by ID. Its current target run
// time and all future runs are cancelled.
func (s *PeriodicJobEnqueuer) RemoveByID(id string) bool {
s.mu.Lock()
Expand Down Expand Up @@ -291,8 +291,8 @@ func (s *PeriodicJobEnqueuer) RemoveMany(periodicJobHandles []rivertype.Periodic
}
}

// RemoveMany removes many periodic jobs from the enqueuer by ID. Their current
// target run time and all future runs are cancelled.
// RemoveManyByID removes many periodic jobs from the enqueuer by ID. Their
// current target run time and all future runs are cancelled.
func (s *PeriodicJobEnqueuer) RemoveManyByID(ids []string) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion internal/maintenance/queue_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
QueueRetentionPeriodDefault = 24 * time.Hour
)

// Test-only properties.
// QueueCleanerTestSignals are internal signals used exclusively in tests.
type QueueCleanerTestSignals struct {
DeletedBatch testsignal.TestSignal[struct{}] // notifies when runOnce finishes a pass
}
Expand Down
4 changes: 3 additions & 1 deletion internal/maintenance/reindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
const (
ReindexerIntervalDefault = 24 * time.Hour

// ReindexerTimeoutDefault is the default timeout of the reindexer.
//
// We've had user reports of builds taking 45 seconds on large tables, so
// set a timeout of that plus a little margin. Use of `CONCURRENTLY` should
// prevent index operations that run a little long from impacting work from
Expand All @@ -37,7 +39,7 @@ var defaultIndexNames = []string{ //nolint:gochecknoglobals
"river_job_unique_idx",
}

// Test-only properties.
// ReindexerTestSignals are internal signals used exclusively in tests.
type ReindexerTestSignals struct {
Reindexed testsignal.TestSignal[struct{}] // notifies when a run finishes executing reindexes for all indexes
}
Expand Down
14 changes: 10 additions & 4 deletions internal/riverinternaltest/sharedtx/shared_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,11 @@ func (e *SharedTx) QueryRow(ctx context.Context, query string, args ...any) pgx.
return &SharedTxRow{sharedTxDerivative{sharedTx: e}, row}
}

//
// These are all implemented so that a SharedTx can be used as a pgx.Tx, but are
// all non-functional.
//

func (e *SharedTx) Conn() *pgx.Conn { panic("not implemented") }
func (e *SharedTx) Commit(ctx context.Context) error { panic("not implemented") }
func (e *SharedTx) LargeObjects() pgx.LargeObjects { panic("not implemented") }
Expand Down Expand Up @@ -163,17 +166,20 @@ func (r *SharedTxRows) Close() {
r.innerRows.Close()
}

//
// All of these are simple pass throughs.
//

func (r *SharedTxRows) CommandTag() pgconn.CommandTag { return r.innerRows.CommandTag() }
func (r *SharedTxRows) Conn() *pgx.Conn { return nil }
func (r *SharedTxRows) Err() error { return r.innerRows.Err() }
func (r *SharedTxRows) FieldDescriptions() []pgconn.FieldDescription {
return r.innerRows.FieldDescriptions()
}
func (r *SharedTxRows) Next() bool { return r.innerRows.Next() }
func (r *SharedTxRows) RawValues() [][]byte { return r.innerRows.RawValues() }
func (r *SharedTxRows) Scan(dest ...any) error { return r.innerRows.Scan(dest...) }
func (r *SharedTxRows) Values() ([]interface{}, error) { return r.innerRows.Values() }
func (r *SharedTxRows) Next() bool { return r.innerRows.Next() }
func (r *SharedTxRows) RawValues() [][]byte { return r.innerRows.RawValues() }
func (r *SharedTxRows) Scan(dest ...any) error { return r.innerRows.Scan(dest...) }
func (r *SharedTxRows) Values() ([]any, error) { return r.innerRows.Values() }

// SharedSubTx wraps a pgx.Tx such that it unlocks SharedTx when it commits or
// rolls back.
Expand Down
4 changes: 2 additions & 2 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ func (p *producer) Stop() {
p.Logger.Debug(p.Name+": Stop returned", slog.String("queue", p.config.Queue), slog.Int64("id", p.id.Load()))
}

// Start starts the producer. It backgrounds a goroutine which is stopped when
// context is cancelled or Stop is invoked.
// StartWorkContext starts the producer. It backgrounds a goroutine which is
// stopped when context is cancelled or Stop is invoked.
//
// When fetchCtx is cancelled, no more jobs will be fetched; however, if a fetch
// is already in progress, It will be allowed to complete and run any fetched
Expand Down
2 changes: 1 addition & 1 deletion retry_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type ClientRetryPolicy interface {
NextRetry(job *rivertype.JobRow) time.Time
}

// River's default retry policy.
// DefaultClientRetryPolicy is River's default retry policy.
type DefaultClientRetryPolicy struct {
timeNowFunc func() time.Time
}
Expand Down
6 changes: 3 additions & 3 deletions riverdriver/riverdatabasesql/river_database_sql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ type templateReplaceWrapper struct {
replacer *sqlctemplate.Replacer
}

func (w templateReplaceWrapper) ExecContext(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) {
func (w templateReplaceWrapper) ExecContext(ctx context.Context, sql string, args ...any) (sql.Result, error) {
sql, args = w.replacer.Run(ctx, argPlaceholder, sql, args)
return w.dbtx.ExecContext(ctx, sql, args...)
}
Expand All @@ -1118,12 +1118,12 @@ func (w templateReplaceWrapper) PrepareContext(ctx context.Context, sql string)
return w.dbtx.PrepareContext(ctx, sql)
}

func (w templateReplaceWrapper) QueryContext(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) {
func (w templateReplaceWrapper) QueryContext(ctx context.Context, sql string, args ...any) (*sql.Rows, error) {
sql, args = w.replacer.Run(ctx, argPlaceholder, sql, args)
return w.dbtx.QueryContext(ctx, sql, args...)
}

func (w templateReplaceWrapper) QueryRowContext(ctx context.Context, sql string, args ...interface{}) *sql.Row {
func (w templateReplaceWrapper) QueryRowContext(ctx context.Context, sql string, args ...any) *sql.Row {
sql, args = w.replacer.Run(ctx, argPlaceholder, sql, args)
return w.dbtx.QueryRowContext(ctx, sql, args...)
}
Expand Down
16 changes: 8 additions & 8 deletions riverdriver/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1607,7 +1607,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
)

insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
for i := range insertParams {
insertParams[i] = &riverdriver.JobInsertFastParams{
ID: ptrutil.Ptr(idStart + int64(i)),
CreatedAt: ptrutil.Ptr(now.Add(time.Duration(i) * 5 * time.Second)),
Expand Down Expand Up @@ -1668,7 +1668,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
exec, _ := setup(ctx, t)

insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
for i := range insertParams {
insertParams[i] = &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
Expand Down Expand Up @@ -1826,7 +1826,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
now := time.Now().UTC().Add(-1 * time.Minute)

insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
for i := range insertParams {
insertParams[i] = &riverdriver.JobInsertFastParams{
CreatedAt: ptrutil.Ptr(now.Add(time.Duration(i) * 5 * time.Second)),
EncodedArgs: []byte(`{"encoded": "args"}`),
Expand Down Expand Up @@ -1881,7 +1881,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
exec, _ := setup(ctx, t)

insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
for i := range insertParams {
insertParams[i] = &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
Expand Down Expand Up @@ -2011,7 +2011,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
exec, _ := setup(ctx, t)

insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
for i := range insertParams {
insertParams[i] = &riverdriver.JobInsertFastParams{
CreatedAt: nil, // explicit nil
EncodedArgs: []byte(`{"encoded": "args"}`),
Expand Down Expand Up @@ -2050,7 +2050,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
exec, _ := setup(ctx, t)

insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
for i := range insertParams {
insertParams[i] = &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
Expand Down Expand Up @@ -2097,7 +2097,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
now := time.Now().UTC().Add(-1 * time.Minute)

insertParams := make([]*riverdriver.JobInsertFastParams, 10)
for i := 0; i < len(insertParams); i++ {
for i := range insertParams {
insertParams[i] = &riverdriver.JobInsertFastParams{
CreatedAt: ptrutil.Ptr(now.Add(time.Duration(i) * 5 * time.Second)),
EncodedArgs: []byte(`{"encoded": "args"}`),
Expand Down Expand Up @@ -3143,7 +3143,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
now := time.Now().UTC()

job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at":"%s"}`, time.Now().UTC().Format(time.RFC3339))),
Metadata: fmt.Appendf(nil, `{"cancel_attempted_at":"%s"}`, time.Now().UTC().Format(time.RFC3339)),
State: ptrutil.Ptr(rivertype.JobStateRunning),
ScheduledAt: ptrutil.Ptr(now.Add(-10 * time.Second)),
})
Expand Down
Loading
Loading