From ffa5b2b2489d1b40f3a6f48996b7552947843613 Mon Sep 17 00:00:00 2001 From: Jack Cassidy Date: Thu, 30 Oct 2025 16:59:29 +0000 Subject: [PATCH] add: JobCountByAllStates method to Client --- client.go | 12 ++++++++++++ client_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/client.go b/client.go index 89f8a1ff..d5c85b8b 100644 --- a/client.go +++ b/client.go @@ -2275,6 +2275,18 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara return res, nil } +// JobCountByAllStates returns a map of counts for all job states. +// The provided context is used for the underlying database query. +func (c *Client[TTx]) JobCountByAllStates(ctx context.Context) (map[rivertype.JobState]int, error) { + if !c.driver.PoolIsSet() { + return nil, errNoDriverDBPool + } + + return c.driver.GetExecutor().JobCountByAllStates(ctx, &riverdriver.JobCountByAllStatesParams{ + Schema: c.config.Schema, + }) +} + // PeriodicJobs returns the currently configured set of periodic jobs for the // client, and can be used to add new or remove existing ones. // diff --git a/client_test.go b/client_test.go index 77c494fb..2997ad3b 100644 --- a/client_test.go +++ b/client_test.go @@ -4389,6 +4389,58 @@ func Test_Client_JobList(t *testing.T) { }) } +func Test_Client_JobCountByAllStates(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + exec riverdriver.Executor + schema string + } + + setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) { + t.Helper() + + var ( + dbPool = riversharedtest.DBPool(ctx, t) + driver = riverpgxv5.New(dbPool) + schema = riverdbtest.TestSchema(ctx, t, driver, nil) + config = newTestConfig(t, schema) + client = newTestClient(t, dbPool, config) + ) + + return client, &testBundle{ + exec: client.driver.GetExecutor(), + schema: schema, + } + } + + t.Run("CountsAllStatesWithZerosForMissing", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable), Schema: bundle.schema}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning), Schema: bundle.schema}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning), Schema: bundle.schema}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStatePending), Schema: bundle.schema}) + + counts, err := client.JobCountByAllStates(ctx) + require.NoError(t, err) + + require.Equal(t, 1, counts[rivertype.JobStateAvailable]) + require.Equal(t, 2, counts[rivertype.JobStateRunning]) + require.Equal(t, 1, counts[rivertype.JobStatePending]) + // verify a few states default to zero when absent + require.Equal(t, 0, counts[rivertype.JobStateScheduled]) + require.Equal(t, 0, counts[rivertype.JobStateCompleted]) + require.Equal(t, 0, counts[rivertype.JobStateDiscarded]) + require.Equal(t, 0, counts[rivertype.JobStateCancelled]) + require.Equal(t, 0, counts[rivertype.JobStateRetryable]) + }) +} + func Test_Client_JobRetry(t *testing.T) { t.Parallel()