Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- The reindexer maintenance service now reindexes all `river_job` indexes, including its primary key. This is expected to help in situations where the jobs table has in the past expanded to a very large size (which makes most indexes larger), is now a much more modest size, but has left the indexes in their expanded state. [PR #963](https://github.com/riverqueue/river/pull/963).
- The River CLI now accepts a `--target-version` of 0 with `river migrate-down` to run all down migrations and remove all River tables (previously, -1 was used for this; -1 still works, but now 0 also works). [PR #966](https://github.com/riverqueue/river/pull/966).
- **Breaking change:** The `HookWorkEnd` interface's `WorkEnd` function now receives a `JobRow` parameter in addition to the `error` it received before. Having a `JobRow` to work with is fairly crucial to most functionality that a hook would implement, and its previous omission was entirely an error. [PR #970](https://github.com/riverqueue/river/pull/970).
- Add maximum bound to each job's `attempted_by` array so that in degenerate cases where a job is run many, many times (say it's snoozed hundreds of times), it doesn't grow to unlimited bounds. [PR #974](https://github.com/riverqueue/river/pull/974).

### Fixed

Expand Down
6 changes: 3 additions & 3 deletions internal/jobexecutor/job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ func TestJobExecutor_Execute(t *testing.T) {

// Fetch the job to make sure it's marked as running:
jobs, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
Max: 1,
Now: ptrutil.Ptr(now),
Queue: rivercommon.QueueDefault,
MaxToLock: 1,
Now: ptrutil.Ptr(now),
Queue: rivercommon.QueueDefault,
})
require.NoError(t, err)

Expand Down
17 changes: 11 additions & 6 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,13 +742,18 @@ func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultC
// back to the queue.
ctx := context.WithoutCancel(workCtx)

// Maximum size of the `attempted_by` array on each job row. This maximum is
// rarely hit, but exists to protect against degenerate cases.
const maxAttemptedBy = 100

jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{
ClientID: p.config.ClientID,
Max: count,
Now: p.Time.NowUTCOrNil(),
Queue: p.config.Queue,
ProducerID: p.id.Load(),
Schema: p.config.Schema,
ClientID: p.config.ClientID,
MaxAttemptedBy: maxAttemptedBy,
MaxToLock: count,
Now: p.Time.NowUTCOrNil(),
Queue: p.config.Queue,
ProducerID: p.id.Load(),
Schema: p.config.Schema,
})
if err != nil {
p.Logger.Error(p.Name+": Error fetching jobs", slog.String("err", err.Error()), slog.String("queue", p.config.Queue))
Expand Down
13 changes: 7 additions & 6 deletions riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,13 @@ type JobDeleteManyParams struct {
}

type JobGetAvailableParams struct {
ClientID string
Max int
Now *time.Time
ProducerID int64
Queue string
Schema string
ClientID string
MaxAttemptedBy int
MaxToLock int
Now *time.Time
ProducerID int64
Queue string
Schema string
}

type JobGetByIDParams struct {
Expand Down
25 changes: 17 additions & 8 deletions riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions riverdriver/riverdatabasesql/river_database_sql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,11 @@ func (e *Executor) JobDeleteMany(ctx context.Context, params *riverdriver.JobDel

func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
jobs, err := dbsqlc.New().JobGetAvailable(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetAvailableParams{
AttemptedBy: params.ClientID,
Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec
Now: params.Now,
Queue: params.Queue,
AttemptedBy: params.ClientID,
MaxAttemptedBy: int32(min(params.MaxAttemptedBy, math.MaxInt32)), //nolint:gosec
MaxToLock: int32(min(params.MaxToLock, math.MaxInt32)), //nolint:gosec
Now: params.Now,
Queue: params.Queue,
})
if err != nil {
return nil, interpretError(err)
Expand Down
12 changes: 6 additions & 6 deletions riverdriver/riverdrivertest/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,9 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) {

for range b.N {
if _, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
ClientID: clientID,
Max: 100,
Queue: river.QueueDefault,
ClientID: clientID,
MaxToLock: 100,
Queue: river.QueueDefault,
}); err != nil {
b.Fatal(err)
}
Expand All @@ -425,9 +425,9 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if _, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
ClientID: clientID,
Max: 100,
Queue: river.QueueDefault,
ClientID: clientID,
MaxToLock: 100,
Queue: river.QueueDefault,
}); err != nil {
b.Fatal(err)
}
Expand Down
126 changes: 103 additions & 23 deletions riverdriver/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
t.Run("JobGetAvailable", func(t *testing.T) {
t.Parallel()

const (
maxAttemptedBy = 10
maxToLock = 100
)

t.Run("Success", func(t *testing.T) {
t.Parallel()

Expand All @@ -1012,9 +1017,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{})

jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
ClientID: clientID,
Max: 100,
Queue: rivercommon.QueueDefault,
ClientID: clientID,
MaxAttemptedBy: maxAttemptedBy,
MaxToLock: maxToLock,
Queue: rivercommon.QueueDefault,
})
require.NoError(t, err)
require.Len(t, jobRows, 1)
Expand All @@ -1033,9 +1039,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,

// Two rows inserted but only one found because of the added limit.
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
ClientID: clientID,
Max: 1,
Queue: rivercommon.QueueDefault,
ClientID: clientID,
MaxAttemptedBy: maxAttemptedBy,
MaxToLock: 1,
Queue: rivercommon.QueueDefault,
})
require.NoError(t, err)
require.Len(t, jobRows, 1)
Expand All @@ -1052,9 +1059,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,

// Job is in a non-default queue so it's not found.
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
ClientID: clientID,
Max: 100,
Queue: rivercommon.QueueDefault,
ClientID: clientID,
MaxAttemptedBy: maxAttemptedBy,
MaxToLock: maxToLock,
Queue: rivercommon.QueueDefault,
})
require.NoError(t, err)
require.Empty(t, jobRows)
Expand All @@ -1073,10 +1081,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,

// Job is scheduled a while from now so it's not found.
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
ClientID: clientID,
Max: 100,
Now: &now,
Queue: rivercommon.QueueDefault,
ClientID: clientID,
MaxAttemptedBy: maxAttemptedBy,
MaxToLock: maxToLock,
Now: &now,
Queue: rivercommon.QueueDefault,
})
require.NoError(t, err)
require.Empty(t, jobRows)
Expand All @@ -1098,10 +1107,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
})

jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
ClientID: clientID,
Max: 100,
Now: ptrutil.Ptr(now),
Queue: rivercommon.QueueDefault,
ClientID: clientID,
MaxAttemptedBy: maxAttemptedBy,
MaxToLock: maxToLock,
Now: ptrutil.Ptr(now),
Queue: rivercommon.QueueDefault,
})
require.NoError(t, err)
require.Len(t, jobRows, 1)
Expand All @@ -1121,9 +1131,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
}

jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
ClientID: clientID,
Max: 2,
Queue: rivercommon.QueueDefault,
ClientID: clientID,
MaxAttemptedBy: maxAttemptedBy,
MaxToLock: 2,
Queue: rivercommon.QueueDefault,
})
require.NoError(t, err)
require.Len(t, jobRows, 2, "expected to fetch exactly 2 jobs")
Expand All @@ -1140,15 +1151,84 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,

// Should fetch the one remaining job on the next attempt:
jobRows, err = exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
ClientID: clientID,
Max: 1,
Queue: rivercommon.QueueDefault,
ClientID: clientID,
MaxAttemptedBy: maxAttemptedBy,
MaxToLock: 1,
Queue: rivercommon.QueueDefault,
})
require.NoError(t, err)
require.NoError(t, err)
require.Len(t, jobRows, 1, "expected to fetch exactly 1 job")
require.Equal(t, 3, jobRows[0].Priority, "expected final job to have priority 3")
})

t.Run("AttemptedByAtMaxTruncated", func(t *testing.T) {
t.Parallel()

exec, _ := setup(ctx, t)

attemptedBy := make([]string, maxAttemptedBy)
for i := range maxAttemptedBy {
attemptedBy[i] = "attempt_" + strconv.Itoa(i)
}

_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
AttemptedBy: attemptedBy,
})

// Job is in a non-default queue so it's not found.
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
ClientID: clientID,
MaxAttemptedBy: maxAttemptedBy,
MaxToLock: maxToLock,
Queue: rivercommon.QueueDefault,
})
require.NoError(t, err)
require.Len(t, jobRows, 1)

jobRow := jobRows[0]
require.Equal(t, append(
attemptedBy[1:],
clientID,
), jobRow.AttemptedBy)
require.Len(t, jobRow.AttemptedBy, maxAttemptedBy)
})

// Almost identical to the above, but tests that there are more existing
// `attempted_by` elements than the maximum allowed. There's a fine bug
// around use of > versus >= in the query's conditional, so make sure to
// capture both cases to make sure they work.
t.Run("AttemptedByOverMaxTruncated", func(t *testing.T) {
t.Parallel()

exec, _ := setup(ctx, t)

attemptedBy := make([]string, maxAttemptedBy+1)
for i := range maxAttemptedBy + 1 {
attemptedBy[i] = "attempt_" + strconv.Itoa(i)
}

_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
AttemptedBy: attemptedBy,
})

// Job is in a non-default queue so it's not found.
jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{
ClientID: clientID,
MaxAttemptedBy: maxAttemptedBy,
MaxToLock: maxToLock,
Queue: rivercommon.QueueDefault,
})
require.NoError(t, err)
require.Len(t, jobRows, 1)

jobRow := jobRows[0]
require.Equal(t, append(
attemptedBy[2:], // start at 2 because there were 2 extra elements
clientID,
), jobRow.AttemptedBy)
require.Len(t, jobRow.AttemptedBy, maxAttemptedBy)
})
})

t.Run("JobGetByID", func(t *testing.T) {
Expand Down
11 changes: 9 additions & 2 deletions riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ WITH locked_jobs AS (
priority ASC,
scheduled_at ASC,
id ASC
LIMIT @max::integer
LIMIT @max_to_lock::integer
FOR UPDATE
SKIP LOCKED
)
Expand All @@ -169,7 +169,14 @@ SET
state = 'running',
attempt = river_job.attempt + 1,
attempted_at = coalesce(sqlc.narg('now')::timestamptz, now()),
attempted_by = array_append(river_job.attempted_by, @attempted_by::text)
attempted_by = array_append(
CASE WHEN array_length(river_job.attempted_by, 1) >= @max_attempted_by::int
-- +2 instead of +1 because Postgres array indexing starts at 1, not 0.
THEN river_job.attempted_by[array_length(river_job.attempted_by, 1) + 2 - @max_attempted_by:]
ELSE river_job.attempted_by
END,
@attempted_by::text
)
FROM
locked_jobs
WHERE
Expand Down
Loading
Loading