diff --git a/CHANGELOG.md b/CHANGELOG.md index 4cfa1ece..209612fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - The reindexer maintenance service now reindexes all `river_job` indexes, including its primary key. This is expected to help in situations where the jobs table has in the past expanded to a very large size (which makes most indexes larger), is now a much more modest size, but has left the indexes in their expanded state. [PR #963](https://github.com/riverqueue/river/pull/963). - The River CLI now accepts a `--target-version` of 0 with `river migrate-down` to run all down migrations and remove all River tables (previously, -1 was used for this; -1 still works, but now 0 also works). [PR #966](https://github.com/riverqueue/river/pull/966). - **Breaking change:** The `HookWorkEnd` interface's `WorkEnd` function now receives a `JobRow` parameter in addition to the `error` it received before. Having a `JobRow` to work with is fairly crucial to most functionality that a hook would implement, and its previous omission was entirely an error. [PR #970](https://github.com/riverqueue/river/pull/970). +- Add maximum bound to each job's `attempted_by` array so that in degenerate cases where a job is run many, many times (say it's snoozed hundreds of times), it doesn't grow to unlimited bounds. [PR #974](https://github.com/riverqueue/river/pull/974). ### Fixed diff --git a/internal/jobexecutor/job_executor_test.go b/internal/jobexecutor/job_executor_test.go index d4f52306..6f70db01 100644 --- a/internal/jobexecutor/job_executor_test.go +++ b/internal/jobexecutor/job_executor_test.go @@ -159,9 +159,9 @@ func TestJobExecutor_Execute(t *testing.T) { // Fetch the job to make sure it's marked as running: jobs, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - Max: 1, - Now: ptrutil.Ptr(now), - Queue: rivercommon.QueueDefault, + MaxToLock: 1, + Now: ptrutil.Ptr(now), + Queue: rivercommon.QueueDefault, }) require.NoError(t, err) diff --git a/producer.go b/producer.go index 3c2ac4f1..0bd1d680 100644 --- a/producer.go +++ b/producer.go @@ -742,13 +742,18 @@ func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultC // back to the queue. ctx := context.WithoutCancel(workCtx) + // Maximum size of the `attempted_by` array on each job row. This maximum is + // rarely hit, but exists to protect against degenerate cases. + const maxAttemptedBy = 100 + jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{ - ClientID: p.config.ClientID, - Max: count, - Now: p.Time.NowUTCOrNil(), - Queue: p.config.Queue, - ProducerID: p.id.Load(), - Schema: p.config.Schema, + ClientID: p.config.ClientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: count, + Now: p.Time.NowUTCOrNil(), + Queue: p.config.Queue, + ProducerID: p.id.Load(), + Schema: p.config.Schema, }) if err != nil { p.Logger.Error(p.Name+": Error fetching jobs", slog.String("err", err.Error()), slog.String("queue", p.config.Queue)) diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index e79ff9e0..d2ae1e44 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -365,12 +365,13 @@ type JobDeleteManyParams struct { } type JobGetAvailableParams struct { - ClientID string - Max int - Now *time.Time - ProducerID int64 - Queue string - Schema string + ClientID string + MaxAttemptedBy int + MaxToLock int + Now *time.Time + ProducerID int64 + Queue string + Schema string } type JobGetByIDParams struct { diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index e80219cc..86c70cc2 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -265,13 +265,13 @@ WITH locked_jobs AS ( /* TEMPLATE: schema */river_job WHERE state = 'available' - AND queue = $3::text + AND queue = $4::text AND scheduled_at <= coalesce($1::timestamptz, now()) ORDER BY priority ASC, scheduled_at ASC, id ASC - LIMIT $4::integer + LIMIT $5::integer FOR UPDATE SKIP LOCKED ) @@ -281,7 +281,14 @@ SET state = 'running', attempt = river_job.attempt + 1, attempted_at = coalesce($1::timestamptz, now()), - attempted_by = array_append(river_job.attempted_by, $2::text) + attempted_by = array_append( + CASE WHEN array_length(river_job.attempted_by, 1) >= $2::int + -- +2 instead of +1 because Postgres array indexing starts at 1, not 0. + THEN river_job.attempted_by[array_length(river_job.attempted_by, 1) + 2 - $2:] + ELSE river_job.attempted_by + END, + $3::text + ) FROM locked_jobs WHERE @@ -291,18 +298,20 @@ RETURNING ` type JobGetAvailableParams struct { - Now *time.Time - AttemptedBy string - Queue string - Max int32 + Now *time.Time + MaxAttemptedBy int32 + AttemptedBy string + Queue string + MaxToLock int32 } func (q *Queries) JobGetAvailable(ctx context.Context, db DBTX, arg *JobGetAvailableParams) ([]*RiverJob, error) { rows, err := db.QueryContext(ctx, jobGetAvailable, arg.Now, + arg.MaxAttemptedBy, arg.AttemptedBy, arg.Queue, - arg.Max, + arg.MaxToLock, ) if err != nil { return nil, err diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 1b14110b..3620a324 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -241,10 +241,11 @@ func (e *Executor) JobDeleteMany(ctx context.Context, params *riverdriver.JobDel func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { jobs, err := dbsqlc.New().JobGetAvailable(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetAvailableParams{ - AttemptedBy: params.ClientID, - Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec - Now: params.Now, - Queue: params.Queue, + AttemptedBy: params.ClientID, + MaxAttemptedBy: int32(min(params.MaxAttemptedBy, math.MaxInt32)), //nolint:gosec + MaxToLock: int32(min(params.MaxToLock, math.MaxInt32)), //nolint:gosec + Now: params.Now, + Queue: params.Queue, }) if err != nil { return nil, interpretError(err) diff --git a/riverdriver/riverdrivertest/driver_test.go b/riverdriver/riverdrivertest/driver_test.go index 66ac0acd..ae2a792f 100644 --- a/riverdriver/riverdrivertest/driver_test.go +++ b/riverdriver/riverdrivertest/driver_test.go @@ -399,9 +399,9 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) { for range b.N { if _, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 100, - Queue: river.QueueDefault, + ClientID: clientID, + MaxToLock: 100, + Queue: river.QueueDefault, }); err != nil { b.Fatal(err) } @@ -425,9 +425,9 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { if _, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 100, - Queue: river.QueueDefault, + ClientID: clientID, + MaxToLock: 100, + Queue: river.QueueDefault, }); err != nil { b.Fatal(err) } diff --git a/riverdriver/riverdrivertest/riverdrivertest.go b/riverdriver/riverdrivertest/riverdrivertest.go index 8efc3802..d36f19f7 100644 --- a/riverdriver/riverdrivertest/riverdrivertest.go +++ b/riverdriver/riverdrivertest/riverdrivertest.go @@ -1004,6 +1004,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, t.Run("JobGetAvailable", func(t *testing.T) { t.Parallel() + const ( + maxAttemptedBy = 10 + maxToLock = 100 + ) + t.Run("Success", func(t *testing.T) { t.Parallel() @@ -1012,9 +1017,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{}) jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 100, - Queue: rivercommon.QueueDefault, + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: maxToLock, + Queue: rivercommon.QueueDefault, }) require.NoError(t, err) require.Len(t, jobRows, 1) @@ -1033,9 +1039,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, // Two rows inserted but only one found because of the added limit. jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 1, - Queue: rivercommon.QueueDefault, + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: 1, + Queue: rivercommon.QueueDefault, }) require.NoError(t, err) require.Len(t, jobRows, 1) @@ -1052,9 +1059,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, // Job is in a non-default queue so it's not found. jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 100, - Queue: rivercommon.QueueDefault, + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: maxToLock, + Queue: rivercommon.QueueDefault, }) require.NoError(t, err) require.Empty(t, jobRows) @@ -1073,10 +1081,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, // Job is scheduled a while from now so it's not found. jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 100, - Now: &now, - Queue: rivercommon.QueueDefault, + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: maxToLock, + Now: &now, + Queue: rivercommon.QueueDefault, }) require.NoError(t, err) require.Empty(t, jobRows) @@ -1098,10 +1107,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, }) jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 100, - Now: ptrutil.Ptr(now), - Queue: rivercommon.QueueDefault, + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: maxToLock, + Now: ptrutil.Ptr(now), + Queue: rivercommon.QueueDefault, }) require.NoError(t, err) require.Len(t, jobRows, 1) @@ -1121,9 +1131,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, } jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 2, - Queue: rivercommon.QueueDefault, + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: 2, + Queue: rivercommon.QueueDefault, }) require.NoError(t, err) require.Len(t, jobRows, 2, "expected to fetch exactly 2 jobs") @@ -1140,15 +1151,84 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, // Should fetch the one remaining job on the next attempt: jobRows, err = exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 1, - Queue: rivercommon.QueueDefault, + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: 1, + Queue: rivercommon.QueueDefault, }) require.NoError(t, err) require.NoError(t, err) require.Len(t, jobRows, 1, "expected to fetch exactly 1 job") require.Equal(t, 3, jobRows[0].Priority, "expected final job to have priority 3") }) + + t.Run("AttemptedByAtMaxTruncated", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + attemptedBy := make([]string, maxAttemptedBy) + for i := range maxAttemptedBy { + attemptedBy[i] = "attempt_" + strconv.Itoa(i) + } + + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + AttemptedBy: attemptedBy, + }) + + // Job is in a non-default queue so it's not found. + jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: maxToLock, + Queue: rivercommon.QueueDefault, + }) + require.NoError(t, err) + require.Len(t, jobRows, 1) + + jobRow := jobRows[0] + require.Equal(t, append( + attemptedBy[1:], + clientID, + ), jobRow.AttemptedBy) + require.Len(t, jobRow.AttemptedBy, maxAttemptedBy) + }) + + // Almost identical to the above, but tests that there are more existing + // `attempted_by` elements than the maximum allowed. There's a fine bug + // around use of > versus >= in the query's conditional, so make sure to + // capture both cases to make sure they work. + t.Run("AttemptedByOverMaxTruncated", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + attemptedBy := make([]string, maxAttemptedBy+1) + for i := range maxAttemptedBy + 1 { + attemptedBy[i] = "attempt_" + strconv.Itoa(i) + } + + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + AttemptedBy: attemptedBy, + }) + + // Job is in a non-default queue so it's not found. + jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: maxToLock, + Queue: rivercommon.QueueDefault, + }) + require.NoError(t, err) + require.Len(t, jobRows, 1) + + jobRow := jobRows[0] + require.Equal(t, append( + attemptedBy[2:], // start at 2 because there were 2 extra elements + clientID, + ), jobRow.AttemptedBy) + require.Len(t, jobRow.AttemptedBy, maxAttemptedBy) + }) }) t.Run("JobGetByID", func(t *testing.T) { diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index 023624ae..61a64bbd 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -159,7 +159,7 @@ WITH locked_jobs AS ( priority ASC, scheduled_at ASC, id ASC - LIMIT @max::integer + LIMIT @max_to_lock::integer FOR UPDATE SKIP LOCKED ) @@ -169,7 +169,14 @@ SET state = 'running', attempt = river_job.attempt + 1, attempted_at = coalesce(sqlc.narg('now')::timestamptz, now()), - attempted_by = array_append(river_job.attempted_by, @attempted_by::text) + attempted_by = array_append( + CASE WHEN array_length(river_job.attempted_by, 1) >= @max_attempted_by::int + -- +2 instead of +1 because Postgres array indexing starts at 1, not 0. + THEN river_job.attempted_by[array_length(river_job.attempted_by, 1) + 2 - @max_attempted_by:] + ELSE river_job.attempted_by + END, + @attempted_by::text + ) FROM locked_jobs WHERE diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index eb719128..d451d5a8 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -262,13 +262,13 @@ WITH locked_jobs AS ( /* TEMPLATE: schema */river_job WHERE state = 'available' - AND queue = $3::text + AND queue = $4::text AND scheduled_at <= coalesce($1::timestamptz, now()) ORDER BY priority ASC, scheduled_at ASC, id ASC - LIMIT $4::integer + LIMIT $5::integer FOR UPDATE SKIP LOCKED ) @@ -278,7 +278,14 @@ SET state = 'running', attempt = river_job.attempt + 1, attempted_at = coalesce($1::timestamptz, now()), - attempted_by = array_append(river_job.attempted_by, $2::text) + attempted_by = array_append( + CASE WHEN array_length(river_job.attempted_by, 1) >= $2::int + -- +2 instead of +1 because Postgres array indexing starts at 1, not 0. + THEN river_job.attempted_by[array_length(river_job.attempted_by, 1) + 2 - $2:] + ELSE river_job.attempted_by + END, + $3::text + ) FROM locked_jobs WHERE @@ -288,18 +295,20 @@ RETURNING ` type JobGetAvailableParams struct { - Now *time.Time - AttemptedBy string - Queue string - Max int32 + Now *time.Time + MaxAttemptedBy int32 + AttemptedBy string + Queue string + MaxToLock int32 } func (q *Queries) JobGetAvailable(ctx context.Context, db DBTX, arg *JobGetAvailableParams) ([]*RiverJob, error) { rows, err := db.Query(ctx, jobGetAvailable, arg.Now, + arg.MaxAttemptedBy, arg.AttemptedBy, arg.Queue, - arg.Max, + arg.MaxToLock, ) if err != nil { return nil, err diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 04da6f46..d5620ac3 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -245,10 +245,11 @@ func (e *Executor) JobDeleteMany(ctx context.Context, params *riverdriver.JobDel func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { jobs, err := dbsqlc.New().JobGetAvailable(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetAvailableParams{ - AttemptedBy: params.ClientID, - Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec - Now: params.Now, - Queue: params.Queue, + AttemptedBy: params.ClientID, + MaxAttemptedBy: int32(min(params.MaxAttemptedBy, math.MaxInt32)), //nolint:gosec + MaxToLock: int32(min(params.MaxToLock, math.MaxInt32)), //nolint:gosec + Now: params.Now, + Queue: params.Queue, }) if err != nil { return nil, interpretError(err) diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql index c4ae650a..d466830c 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql @@ -100,7 +100,11 @@ UPDATE /* TEMPLATE: schema */river_job SET attempt = river_job.attempt + 1, attempted_at = coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), - attempted_by = json_insert(coalesce(attempted_by, json('[]')), '$[#]', cast(@attempted_by AS text)), + + -- This is replaced in the driver to work around sqlc bugs for SQLite. See + -- comments there for more details. + attempted_by = /* TEMPLATE_BEGIN: attempted_by_clause */ attempted_by /* TEMPLATE_END */, + state = 'running' WHERE id IN ( SELECT id @@ -114,7 +118,7 @@ WHERE id IN ( priority ASC, scheduled_at ASC, id ASC - LIMIT @max + LIMIT @max_to_lock ) RETURNING *; diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go index 8da71bfe..cdea9049 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go @@ -210,42 +210,40 @@ UPDATE /* TEMPLATE: schema */river_job SET attempt = river_job.attempt + 1, attempted_at = coalesce(cast(?1 AS text), datetime('now', 'subsec')), - attempted_by = json_insert(coalesce(attempted_by, json('[]')), '$[#]', cast(?2 AS text)), + + -- This is replaced in the driver to work around sqlc bugs for SQLite. See + -- comments there for more details. + attempted_by = /* TEMPLATE_BEGIN: attempted_by_clause */ attempted_by /* TEMPLATE_END */, + state = 'running' WHERE id IN ( SELECT id FROM /* TEMPLATE: schema */river_job WHERE priority >= 0 - AND river_job.queue = ?3 + AND river_job.queue = ?2 AND scheduled_at <= coalesce(cast(?1 AS text), datetime('now', 'subsec')) AND state = 'available' ORDER BY priority ASC, scheduled_at ASC, id ASC - LIMIT ?4 + LIMIT ?3 ) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states ` type JobGetAvailableParams struct { - Now *string - AttemptedBy string - Queue string - Max int64 + Now *string + Queue string + MaxToLock int64 } // Differs from the Postgres version in that we don't have `FOR UPDATE SKIP // LOCKED`. It doesn't exist in SQLite, but more aptly, there's only one writer // on SQLite at a time, so nothing else has the rows locked. func (q *Queries) JobGetAvailable(ctx context.Context, db DBTX, arg *JobGetAvailableParams) ([]*RiverJob, error) { - rows, err := db.QueryContext(ctx, jobGetAvailable, - arg.Now, - arg.AttemptedBy, - arg.Queue, - arg.Max, - ) + rows, err := db.QueryContext(ctx, jobGetAvailable, arg.Now, arg.Queue, arg.MaxToLock) if err != nil { return nil, err } diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 5c5e3196..e093761e 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -330,12 +330,63 @@ func (e *Executor) JobDeleteMany(ctx context.Context, params *riverdriver.JobDel return sliceutil.MapError(jobs, jobRowFromInternal) } +// This really sucks, but this SQL fragment's been extracted to a string because +// sqlc is buggy and can't parse it. +// +// The SQL appends a new `attempted_by` to a job's `attempted_by` array (a jsonb +// array), which in SQLite is really quite difficult to to because there aren't +// any arrays or array functions. I tried every version of this in sqlc I could +// come up with, but there was a bug in every direction that blocked it. e.g. +// +// - This version almost works, but as soon as you add a subselect like `FROM ( +// ... )`, SQLite bugs out and can't handle it. +// +// - I tried putting it in a CTE, but this is paired with an `UPDATE` statement, +// and `UPDATE ... FROM` isn't support in sqlc for SQLite. +// +// I'm really hoping this could be fixed one day by by bringing it back in with +// the rest of the job definitions, but it'll require some sqlc fixes for that +// to work. Frustratingly, some of these fixes actually exist already [1], but +// just can't be merged/release due to bottlenecks in the sqlc project. +// +// [1] https://github.com/sqlc-dev/sqlc/pull/3610 +// +//nolint:gochecknoglobals +var jobGetAvailableAttemptedBySQL = strings.TrimSpace(` + json_insert( + ( + SELECT json_group_array(value) + FROM ( + SELECT * + FROM ( + SELECT * + FROM json_each(attempted_by) + ORDER BY key DESC + LIMIT @max_attempted_by - 1 + ) + ORDER BY key + ) + ), + '$[#]', + @attempted_by + ) +`) + func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { + ctx = sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{ + "attempted_by_clause": { + Stable: true, // input never changes + Value: jobGetAvailableAttemptedBySQL, + }, + }, map[string]any{ + "attempted_by": params.ClientID, + "max_attempted_by": params.MaxAttemptedBy, + }) + jobs, err := dbsqlc.New().JobGetAvailable(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetAvailableParams{ - AttemptedBy: params.ClientID, - Max: int64(params.Max), - Now: timeStringNullable(params.Now), - Queue: params.Queue, + MaxToLock: int64(params.MaxToLock), + Now: timeStringNullable(params.Now), + Queue: params.Queue, }) if err != nil { return nil, interpretError(err) diff --git a/rivershared/riverpilot/standard.go b/rivershared/riverpilot/standard.go index cfbec8da..2e1c576a 100644 --- a/rivershared/riverpilot/standard.go +++ b/rivershared/riverpilot/standard.go @@ -14,7 +14,7 @@ type StandardPilot struct { } func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Executor, state ProducerState, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { - if params.Max <= 0 { + if params.MaxToLock <= 0 { return nil, nil } return exec.JobGetAvailable(ctx, params)