From 90ee8dc26e54571c71c92ef72b5972edd983a2d5 Mon Sep 17 00:00:00 2001 From: Brandur Date: Wed, 2 Jul 2025 07:07:06 -0700 Subject: [PATCH] Add maximum bound for `attempted_by` array As described by #972, it may be possible for huge numbers of snoozes to blow out a job row's `attempted_by` array as a job is locked over and over again. Multiplied by many jobs, this can produce vast quantities of data that gets sent over the network. Here, put in a ratchet on `attempted_by` so that if the array becomes larger than 100 elements, we knock the oldest one off in favor of the most recent client and the most fresh 99. Unfortunately the implementation isn't particularly clean in either Postgres or SQLite. In Postgres it would've been cleaner if we'd had the `attempted_by` in reverse order so the new client was on front because the built-in array functions would be friendlier to that layout, but because it's not, we have to do something a little hackier involving a `CASE` statement instead. SQLite is even worse. SQLite has no array functions at all, which doesn't help, but moreover every strategy I tried ended up blocked by a sqlc SQLite bug, so after trying everything I could think of, I ended up having to extract the piece that does the array truncation into a SQL template string to get this over the line. This could be removed in the future if any one of a number of outstanding sqlc bugs are fixed (e.g. [1]). [1] https://github.com/sqlc-dev/sqlc/pull/3610 --- CHANGELOG.md | 1 + internal/jobexecutor/job_executor_test.go | 6 +- producer.go | 17 ++- riverdriver/river_driver_interface.go | 13 +- .../internal/dbsqlc/river_job.sql.go | 25 ++-- .../river_database_sql_driver.go | 9 +- riverdriver/riverdrivertest/driver_test.go | 12 +- .../riverdrivertest/riverdrivertest.go | 126 ++++++++++++++---- .../riverpgxv5/internal/dbsqlc/river_job.sql | 11 +- .../internal/dbsqlc/river_job.sql.go | 25 ++-- riverdriver/riverpgxv5/river_pgx_v5_driver.go | 9 +- .../riversqlite/internal/dbsqlc/river_job.sql | 8 +- .../internal/dbsqlc/river_job.sql.go | 24 ++-- .../riversqlite/river_sqlite_driver.go | 59 +++++++- rivershared/riverpilot/standard.go | 2 +- 15 files changed, 257 insertions(+), 90 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4cfa1ece..209612fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - The reindexer maintenance service now reindexes all `river_job` indexes, including its primary key. This is expected to help in situations where the jobs table has in the past expanded to a very large size (which makes most indexes larger), is now a much more modest size, but has left the indexes in their expanded state. [PR #963](https://github.com/riverqueue/river/pull/963). - The River CLI now accepts a `--target-version` of 0 with `river migrate-down` to run all down migrations and remove all River tables (previously, -1 was used for this; -1 still works, but now 0 also works). [PR #966](https://github.com/riverqueue/river/pull/966). - **Breaking change:** The `HookWorkEnd` interface's `WorkEnd` function now receives a `JobRow` parameter in addition to the `error` it received before. Having a `JobRow` to work with is fairly crucial to most functionality that a hook would implement, and its previous omission was entirely an error. [PR #970](https://github.com/riverqueue/river/pull/970). +- Add maximum bound to each job's `attempted_by` array so that in degenerate cases where a job is run many, many times (say it's snoozed hundreds of times), it doesn't grow to unlimited bounds. [PR #974](https://github.com/riverqueue/river/pull/974). ### Fixed diff --git a/internal/jobexecutor/job_executor_test.go b/internal/jobexecutor/job_executor_test.go index d4f52306..6f70db01 100644 --- a/internal/jobexecutor/job_executor_test.go +++ b/internal/jobexecutor/job_executor_test.go @@ -159,9 +159,9 @@ func TestJobExecutor_Execute(t *testing.T) { // Fetch the job to make sure it's marked as running: jobs, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - Max: 1, - Now: ptrutil.Ptr(now), - Queue: rivercommon.QueueDefault, + MaxToLock: 1, + Now: ptrutil.Ptr(now), + Queue: rivercommon.QueueDefault, }) require.NoError(t, err) diff --git a/producer.go b/producer.go index 3c2ac4f1..0bd1d680 100644 --- a/producer.go +++ b/producer.go @@ -742,13 +742,18 @@ func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultC // back to the queue. ctx := context.WithoutCancel(workCtx) + // Maximum size of the `attempted_by` array on each job row. This maximum is + // rarely hit, but exists to protect against degenerate cases. + const maxAttemptedBy = 100 + jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{ - ClientID: p.config.ClientID, - Max: count, - Now: p.Time.NowUTCOrNil(), - Queue: p.config.Queue, - ProducerID: p.id.Load(), - Schema: p.config.Schema, + ClientID: p.config.ClientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: count, + Now: p.Time.NowUTCOrNil(), + Queue: p.config.Queue, + ProducerID: p.id.Load(), + Schema: p.config.Schema, }) if err != nil { p.Logger.Error(p.Name+": Error fetching jobs", slog.String("err", err.Error()), slog.String("queue", p.config.Queue)) diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index e79ff9e0..d2ae1e44 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -365,12 +365,13 @@ type JobDeleteManyParams struct { } type JobGetAvailableParams struct { - ClientID string - Max int - Now *time.Time - ProducerID int64 - Queue string - Schema string + ClientID string + MaxAttemptedBy int + MaxToLock int + Now *time.Time + ProducerID int64 + Queue string + Schema string } type JobGetByIDParams struct { diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index e80219cc..86c70cc2 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -265,13 +265,13 @@ WITH locked_jobs AS ( /* TEMPLATE: schema */river_job WHERE state = 'available' - AND queue = $3::text + AND queue = $4::text AND scheduled_at <= coalesce($1::timestamptz, now()) ORDER BY priority ASC, scheduled_at ASC, id ASC - LIMIT $4::integer + LIMIT $5::integer FOR UPDATE SKIP LOCKED ) @@ -281,7 +281,14 @@ SET state = 'running', attempt = river_job.attempt + 1, attempted_at = coalesce($1::timestamptz, now()), - attempted_by = array_append(river_job.attempted_by, $2::text) + attempted_by = array_append( + CASE WHEN array_length(river_job.attempted_by, 1) >= $2::int + -- +2 instead of +1 because Postgres array indexing starts at 1, not 0. + THEN river_job.attempted_by[array_length(river_job.attempted_by, 1) + 2 - $2:] + ELSE river_job.attempted_by + END, + $3::text + ) FROM locked_jobs WHERE @@ -291,18 +298,20 @@ RETURNING ` type JobGetAvailableParams struct { - Now *time.Time - AttemptedBy string - Queue string - Max int32 + Now *time.Time + MaxAttemptedBy int32 + AttemptedBy string + Queue string + MaxToLock int32 } func (q *Queries) JobGetAvailable(ctx context.Context, db DBTX, arg *JobGetAvailableParams) ([]*RiverJob, error) { rows, err := db.QueryContext(ctx, jobGetAvailable, arg.Now, + arg.MaxAttemptedBy, arg.AttemptedBy, arg.Queue, - arg.Max, + arg.MaxToLock, ) if err != nil { return nil, err diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 1b14110b..3620a324 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -241,10 +241,11 @@ func (e *Executor) JobDeleteMany(ctx context.Context, params *riverdriver.JobDel func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { jobs, err := dbsqlc.New().JobGetAvailable(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetAvailableParams{ - AttemptedBy: params.ClientID, - Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec - Now: params.Now, - Queue: params.Queue, + AttemptedBy: params.ClientID, + MaxAttemptedBy: int32(min(params.MaxAttemptedBy, math.MaxInt32)), //nolint:gosec + MaxToLock: int32(min(params.MaxToLock, math.MaxInt32)), //nolint:gosec + Now: params.Now, + Queue: params.Queue, }) if err != nil { return nil, interpretError(err) diff --git a/riverdriver/riverdrivertest/driver_test.go b/riverdriver/riverdrivertest/driver_test.go index 66ac0acd..ae2a792f 100644 --- a/riverdriver/riverdrivertest/driver_test.go +++ b/riverdriver/riverdrivertest/driver_test.go @@ -399,9 +399,9 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) { for range b.N { if _, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 100, - Queue: river.QueueDefault, + ClientID: clientID, + MaxToLock: 100, + Queue: river.QueueDefault, }); err != nil { b.Fatal(err) } @@ -425,9 +425,9 @@ func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { if _, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 100, - Queue: river.QueueDefault, + ClientID: clientID, + MaxToLock: 100, + Queue: river.QueueDefault, }); err != nil { b.Fatal(err) } diff --git a/riverdriver/riverdrivertest/riverdrivertest.go b/riverdriver/riverdrivertest/riverdrivertest.go index 8efc3802..d36f19f7 100644 --- a/riverdriver/riverdrivertest/riverdrivertest.go +++ b/riverdriver/riverdrivertest/riverdrivertest.go @@ -1004,6 +1004,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, t.Run("JobGetAvailable", func(t *testing.T) { t.Parallel() + const ( + maxAttemptedBy = 10 + maxToLock = 100 + ) + t.Run("Success", func(t *testing.T) { t.Parallel() @@ -1012,9 +1017,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{}) jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 100, - Queue: rivercommon.QueueDefault, + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: maxToLock, + Queue: rivercommon.QueueDefault, }) require.NoError(t, err) require.Len(t, jobRows, 1) @@ -1033,9 +1039,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, // Two rows inserted but only one found because of the added limit. jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 1, - Queue: rivercommon.QueueDefault, + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: 1, + Queue: rivercommon.QueueDefault, }) require.NoError(t, err) require.Len(t, jobRows, 1) @@ -1052,9 +1059,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, // Job is in a non-default queue so it's not found. jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 100, - Queue: rivercommon.QueueDefault, + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: maxToLock, + Queue: rivercommon.QueueDefault, }) require.NoError(t, err) require.Empty(t, jobRows) @@ -1073,10 +1081,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, // Job is scheduled a while from now so it's not found. jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 100, - Now: &now, - Queue: rivercommon.QueueDefault, + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: maxToLock, + Now: &now, + Queue: rivercommon.QueueDefault, }) require.NoError(t, err) require.Empty(t, jobRows) @@ -1098,10 +1107,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, }) jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 100, - Now: ptrutil.Ptr(now), - Queue: rivercommon.QueueDefault, + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: maxToLock, + Now: ptrutil.Ptr(now), + Queue: rivercommon.QueueDefault, }) require.NoError(t, err) require.Len(t, jobRows, 1) @@ -1121,9 +1131,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, } jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 2, - Queue: rivercommon.QueueDefault, + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: 2, + Queue: rivercommon.QueueDefault, }) require.NoError(t, err) require.Len(t, jobRows, 2, "expected to fetch exactly 2 jobs") @@ -1140,15 +1151,84 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, // Should fetch the one remaining job on the next attempt: jobRows, err = exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ - ClientID: clientID, - Max: 1, - Queue: rivercommon.QueueDefault, + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: 1, + Queue: rivercommon.QueueDefault, }) require.NoError(t, err) require.NoError(t, err) require.Len(t, jobRows, 1, "expected to fetch exactly 1 job") require.Equal(t, 3, jobRows[0].Priority, "expected final job to have priority 3") }) + + t.Run("AttemptedByAtMaxTruncated", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + attemptedBy := make([]string, maxAttemptedBy) + for i := range maxAttemptedBy { + attemptedBy[i] = "attempt_" + strconv.Itoa(i) + } + + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + AttemptedBy: attemptedBy, + }) + + // Job is in a non-default queue so it's not found. + jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: maxToLock, + Queue: rivercommon.QueueDefault, + }) + require.NoError(t, err) + require.Len(t, jobRows, 1) + + jobRow := jobRows[0] + require.Equal(t, append( + attemptedBy[1:], + clientID, + ), jobRow.AttemptedBy) + require.Len(t, jobRow.AttemptedBy, maxAttemptedBy) + }) + + // Almost identical to the above, but tests that there are more existing + // `attempted_by` elements than the maximum allowed. There's a fine bug + // around use of > versus >= in the query's conditional, so make sure to + // capture both cases to make sure they work. + t.Run("AttemptedByOverMaxTruncated", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + attemptedBy := make([]string, maxAttemptedBy+1) + for i := range maxAttemptedBy + 1 { + attemptedBy[i] = "attempt_" + strconv.Itoa(i) + } + + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + AttemptedBy: attemptedBy, + }) + + // Job is in a non-default queue so it's not found. + jobRows, err := exec.JobGetAvailable(ctx, &riverdriver.JobGetAvailableParams{ + ClientID: clientID, + MaxAttemptedBy: maxAttemptedBy, + MaxToLock: maxToLock, + Queue: rivercommon.QueueDefault, + }) + require.NoError(t, err) + require.Len(t, jobRows, 1) + + jobRow := jobRows[0] + require.Equal(t, append( + attemptedBy[2:], // start at 2 because there were 2 extra elements + clientID, + ), jobRow.AttemptedBy) + require.Len(t, jobRow.AttemptedBy, maxAttemptedBy) + }) }) t.Run("JobGetByID", func(t *testing.T) { diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index 023624ae..61a64bbd 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -159,7 +159,7 @@ WITH locked_jobs AS ( priority ASC, scheduled_at ASC, id ASC - LIMIT @max::integer + LIMIT @max_to_lock::integer FOR UPDATE SKIP LOCKED ) @@ -169,7 +169,14 @@ SET state = 'running', attempt = river_job.attempt + 1, attempted_at = coalesce(sqlc.narg('now')::timestamptz, now()), - attempted_by = array_append(river_job.attempted_by, @attempted_by::text) + attempted_by = array_append( + CASE WHEN array_length(river_job.attempted_by, 1) >= @max_attempted_by::int + -- +2 instead of +1 because Postgres array indexing starts at 1, not 0. + THEN river_job.attempted_by[array_length(river_job.attempted_by, 1) + 2 - @max_attempted_by:] + ELSE river_job.attempted_by + END, + @attempted_by::text + ) FROM locked_jobs WHERE diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index eb719128..d451d5a8 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -262,13 +262,13 @@ WITH locked_jobs AS ( /* TEMPLATE: schema */river_job WHERE state = 'available' - AND queue = $3::text + AND queue = $4::text AND scheduled_at <= coalesce($1::timestamptz, now()) ORDER BY priority ASC, scheduled_at ASC, id ASC - LIMIT $4::integer + LIMIT $5::integer FOR UPDATE SKIP LOCKED ) @@ -278,7 +278,14 @@ SET state = 'running', attempt = river_job.attempt + 1, attempted_at = coalesce($1::timestamptz, now()), - attempted_by = array_append(river_job.attempted_by, $2::text) + attempted_by = array_append( + CASE WHEN array_length(river_job.attempted_by, 1) >= $2::int + -- +2 instead of +1 because Postgres array indexing starts at 1, not 0. + THEN river_job.attempted_by[array_length(river_job.attempted_by, 1) + 2 - $2:] + ELSE river_job.attempted_by + END, + $3::text + ) FROM locked_jobs WHERE @@ -288,18 +295,20 @@ RETURNING ` type JobGetAvailableParams struct { - Now *time.Time - AttemptedBy string - Queue string - Max int32 + Now *time.Time + MaxAttemptedBy int32 + AttemptedBy string + Queue string + MaxToLock int32 } func (q *Queries) JobGetAvailable(ctx context.Context, db DBTX, arg *JobGetAvailableParams) ([]*RiverJob, error) { rows, err := db.Query(ctx, jobGetAvailable, arg.Now, + arg.MaxAttemptedBy, arg.AttemptedBy, arg.Queue, - arg.Max, + arg.MaxToLock, ) if err != nil { return nil, err diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 04da6f46..d5620ac3 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -245,10 +245,11 @@ func (e *Executor) JobDeleteMany(ctx context.Context, params *riverdriver.JobDel func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { jobs, err := dbsqlc.New().JobGetAvailable(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetAvailableParams{ - AttemptedBy: params.ClientID, - Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec - Now: params.Now, - Queue: params.Queue, + AttemptedBy: params.ClientID, + MaxAttemptedBy: int32(min(params.MaxAttemptedBy, math.MaxInt32)), //nolint:gosec + MaxToLock: int32(min(params.MaxToLock, math.MaxInt32)), //nolint:gosec + Now: params.Now, + Queue: params.Queue, }) if err != nil { return nil, interpretError(err) diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql index c4ae650a..d466830c 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql @@ -100,7 +100,11 @@ UPDATE /* TEMPLATE: schema */river_job SET attempt = river_job.attempt + 1, attempted_at = coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), - attempted_by = json_insert(coalesce(attempted_by, json('[]')), '$[#]', cast(@attempted_by AS text)), + + -- This is replaced in the driver to work around sqlc bugs for SQLite. See + -- comments there for more details. + attempted_by = /* TEMPLATE_BEGIN: attempted_by_clause */ attempted_by /* TEMPLATE_END */, + state = 'running' WHERE id IN ( SELECT id @@ -114,7 +118,7 @@ WHERE id IN ( priority ASC, scheduled_at ASC, id ASC - LIMIT @max + LIMIT @max_to_lock ) RETURNING *; diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go index 8da71bfe..cdea9049 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go @@ -210,42 +210,40 @@ UPDATE /* TEMPLATE: schema */river_job SET attempt = river_job.attempt + 1, attempted_at = coalesce(cast(?1 AS text), datetime('now', 'subsec')), - attempted_by = json_insert(coalesce(attempted_by, json('[]')), '$[#]', cast(?2 AS text)), + + -- This is replaced in the driver to work around sqlc bugs for SQLite. See + -- comments there for more details. + attempted_by = /* TEMPLATE_BEGIN: attempted_by_clause */ attempted_by /* TEMPLATE_END */, + state = 'running' WHERE id IN ( SELECT id FROM /* TEMPLATE: schema */river_job WHERE priority >= 0 - AND river_job.queue = ?3 + AND river_job.queue = ?2 AND scheduled_at <= coalesce(cast(?1 AS text), datetime('now', 'subsec')) AND state = 'available' ORDER BY priority ASC, scheduled_at ASC, id ASC - LIMIT ?4 + LIMIT ?3 ) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states ` type JobGetAvailableParams struct { - Now *string - AttemptedBy string - Queue string - Max int64 + Now *string + Queue string + MaxToLock int64 } // Differs from the Postgres version in that we don't have `FOR UPDATE SKIP // LOCKED`. It doesn't exist in SQLite, but more aptly, there's only one writer // on SQLite at a time, so nothing else has the rows locked. func (q *Queries) JobGetAvailable(ctx context.Context, db DBTX, arg *JobGetAvailableParams) ([]*RiverJob, error) { - rows, err := db.QueryContext(ctx, jobGetAvailable, - arg.Now, - arg.AttemptedBy, - arg.Queue, - arg.Max, - ) + rows, err := db.QueryContext(ctx, jobGetAvailable, arg.Now, arg.Queue, arg.MaxToLock) if err != nil { return nil, err } diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 5c5e3196..e093761e 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -330,12 +330,63 @@ func (e *Executor) JobDeleteMany(ctx context.Context, params *riverdriver.JobDel return sliceutil.MapError(jobs, jobRowFromInternal) } +// This really sucks, but this SQL fragment's been extracted to a string because +// sqlc is buggy and can't parse it. +// +// The SQL appends a new `attempted_by` to a job's `attempted_by` array (a jsonb +// array), which in SQLite is really quite difficult to to because there aren't +// any arrays or array functions. I tried every version of this in sqlc I could +// come up with, but there was a bug in every direction that blocked it. e.g. +// +// - This version almost works, but as soon as you add a subselect like `FROM ( +// ... )`, SQLite bugs out and can't handle it. +// +// - I tried putting it in a CTE, but this is paired with an `UPDATE` statement, +// and `UPDATE ... FROM` isn't support in sqlc for SQLite. +// +// I'm really hoping this could be fixed one day by by bringing it back in with +// the rest of the job definitions, but it'll require some sqlc fixes for that +// to work. Frustratingly, some of these fixes actually exist already [1], but +// just can't be merged/release due to bottlenecks in the sqlc project. +// +// [1] https://github.com/sqlc-dev/sqlc/pull/3610 +// +//nolint:gochecknoglobals +var jobGetAvailableAttemptedBySQL = strings.TrimSpace(` + json_insert( + ( + SELECT json_group_array(value) + FROM ( + SELECT * + FROM ( + SELECT * + FROM json_each(attempted_by) + ORDER BY key DESC + LIMIT @max_attempted_by - 1 + ) + ORDER BY key + ) + ), + '$[#]', + @attempted_by + ) +`) + func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { + ctx = sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{ + "attempted_by_clause": { + Stable: true, // input never changes + Value: jobGetAvailableAttemptedBySQL, + }, + }, map[string]any{ + "attempted_by": params.ClientID, + "max_attempted_by": params.MaxAttemptedBy, + }) + jobs, err := dbsqlc.New().JobGetAvailable(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetAvailableParams{ - AttemptedBy: params.ClientID, - Max: int64(params.Max), - Now: timeStringNullable(params.Now), - Queue: params.Queue, + MaxToLock: int64(params.MaxToLock), + Now: timeStringNullable(params.Now), + Queue: params.Queue, }) if err != nil { return nil, interpretError(err) diff --git a/rivershared/riverpilot/standard.go b/rivershared/riverpilot/standard.go index cfbec8da..2e1c576a 100644 --- a/rivershared/riverpilot/standard.go +++ b/rivershared/riverpilot/standard.go @@ -14,7 +14,7 @@ type StandardPilot struct { } func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Executor, state ProducerState, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { - if params.Max <= 0 { + if params.MaxToLock <= 0 { return nil, nil } return exec.JobGetAvailable(ctx, params)