From d2056fe355b16f3dabe37ae42921c42aea53e329 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 17 Feb 2026 12:07:36 +0000 Subject: [PATCH 1/2] fix: prevent MVCC race in blockRunWithWaitpoint pending check Split the CTE in blockRunWithWaitpoint so the pending waitpoint check is a separate SQL statement. In READ COMMITTED isolation, each statement gets its own snapshot, so a separate SELECT sees the latest committed state from concurrent completeWaitpoint calls. Previously, the CTE did INSERT + pending check in one statement (one snapshot). If completeWaitpoint committed between the CTE start and the SELECT, the SELECT would still see PENDING due to the stale snapshot. Neither side would enqueue continueRunIfUnblocked, leaving the run stuck forever. --- .../src/engine/systems/waitpointSystem.ts | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 40a92abb55..e998fb685f 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -397,8 +397,10 @@ export class WaitpointSystem { return await this.$.runLock.lock("blockRunWithWaitpoint", [runId], async () => { let snapshot: TaskRunExecutionSnapshot = await getLatestExecutionSnapshot(prisma, runId); - //block the run with the waitpoints, returning how many waitpoints are pending - const insert = await prisma.$queryRaw<{ pending_count: BigInt }[]>` + // Insert the blocking connections and the historical run connections. + // We use a CTE to do both inserts atomically. Data-modifying CTEs are + // always executed regardless of whether they're referenced in the outer query. + await prisma.$queryRaw` WITH inserted AS ( INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt", "spanIdToComplete", "batchId", "batchIndex") SELECT @@ -423,12 +425,21 @@ export class WaitpointSystem { WHERE w.id IN (${Prisma.join($waitpoints)}) ON CONFLICT DO NOTHING ) + SELECT COUNT(*) FROM inserted`; + + // Check if the run is actually blocked using a separate query. + // This MUST be a separate statement from the CTE above because in READ COMMITTED + // isolation, each statement gets its own snapshot. The CTE's snapshot is taken when + // it starts, so if a concurrent completeWaitpoint commits during the CTE, the CTE + // won't see it. This fresh query gets a new snapshot that reflects the latest commits. + const pendingCheck = await prisma.$queryRaw<{ pending_count: BigInt }[]>` SELECT COUNT(*) as pending_count - FROM inserted i - JOIN "Waitpoint" w ON w.id = i."waitpointId" - WHERE w.status = 'PENDING';`; + FROM "Waitpoint" + WHERE id IN (${Prisma.join($waitpoints)}) + AND status = 'PENDING' + `; - const isRunBlocked = Number(insert.at(0)?.pending_count ?? 0) > 0; + const isRunBlocked = Number(pendingCheck.at(0)?.pending_count ?? 0) > 0; let newStatus: TaskRunExecutionStatus = "SUSPENDED"; if ( From 39fa6d24a2ca29bb5de56dc15e18cbd116df6d6f Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 17 Feb 2026 12:23:28 +0000 Subject: [PATCH 2/2] docs: add detailed comment explaining MVCC-safe two-statement design in blockRunWithWaitpoint --- .../src/engine/systems/waitpointSystem.ts | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index e998fb685f..95478c7641 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -364,6 +364,22 @@ export class WaitpointSystem { /** * Prevents a run from continuing until the waitpoint is completed. + * + * This method uses two separate SQL statements intentionally: + * + * 1. A CTE that INSERTs TaskRunWaitpoint rows (blocking connections) and + * _WaitpointRunConnections rows (historical connections). + * + * 2. A separate SELECT that checks if any of the requested waitpoints are still PENDING. + * + * These MUST be separate statements because of PostgreSQL MVCC in READ COMMITTED isolation: + * each statement gets its own snapshot. If a concurrent `completeWaitpoint` commits between + * the CTE starting and finishing, the CTE's snapshot won't see the COMPLETED status. By using + * a separate SELECT, we get a fresh snapshot that reflects the latest committed state. + * + * The pending check queries ALL requested waitpoint IDs (not just the ones actually inserted + * by the CTE). This is intentional: if a TaskRunWaitpoint row already existed (ON CONFLICT + * DO NOTHING skipped the insert), a still-PENDING waitpoint should still count as blocking. */ async blockRunWithWaitpoint({ runId,