diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 40a92abb55..95478c7641 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -364,6 +364,22 @@ export class WaitpointSystem { /** * Prevents a run from continuing until the waitpoint is completed. + * + * This method uses two separate SQL statements intentionally: + * + * 1. A CTE that INSERTs TaskRunWaitpoint rows (blocking connections) and + * _WaitpointRunConnections rows (historical connections). + * + * 2. A separate SELECT that checks if any of the requested waitpoints are still PENDING. + * + * These MUST be separate statements because of PostgreSQL MVCC in READ COMMITTED isolation: + * each statement gets its own snapshot. If a concurrent `completeWaitpoint` commits between + * the CTE starting and finishing, the CTE's snapshot won't see the COMPLETED status. By using + * a separate SELECT, we get a fresh snapshot that reflects the latest committed state. + * + * The pending check queries ALL requested waitpoint IDs (not just the ones actually inserted + * by the CTE). This is intentional: if a TaskRunWaitpoint row already existed (ON CONFLICT + * DO NOTHING skipped the insert), a still-PENDING waitpoint should still count as blocking. */ async blockRunWithWaitpoint({ runId, @@ -397,8 +413,10 @@ export class WaitpointSystem { return await this.$.runLock.lock("blockRunWithWaitpoint", [runId], async () => { let snapshot: TaskRunExecutionSnapshot = await getLatestExecutionSnapshot(prisma, runId); - //block the run with the waitpoints, returning how many waitpoints are pending - const insert = await prisma.$queryRaw<{ pending_count: BigInt }[]>` + // Insert the blocking connections and the historical run connections. + // We use a CTE to do both inserts atomically. Data-modifying CTEs are + // always executed regardless of whether they're referenced in the outer query. + await prisma.$queryRaw` WITH inserted AS ( INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt", "spanIdToComplete", "batchId", "batchIndex") SELECT @@ -423,12 +441,21 @@ export class WaitpointSystem { WHERE w.id IN (${Prisma.join($waitpoints)}) ON CONFLICT DO NOTHING ) + SELECT COUNT(*) FROM inserted`; + + // Check if the run is actually blocked using a separate query. + // This MUST be a separate statement from the CTE above because in READ COMMITTED + // isolation, each statement gets its own snapshot. The CTE's snapshot is taken when + // it starts, so if a concurrent completeWaitpoint commits during the CTE, the CTE + // won't see it. This fresh query gets a new snapshot that reflects the latest commits. + const pendingCheck = await prisma.$queryRaw<{ pending_count: BigInt }[]>` SELECT COUNT(*) as pending_count - FROM inserted i - JOIN "Waitpoint" w ON w.id = i."waitpointId" - WHERE w.status = 'PENDING';`; + FROM "Waitpoint" + WHERE id IN (${Prisma.join($waitpoints)}) + AND status = 'PENDING' + `; - const isRunBlocked = Number(insert.at(0)?.pending_count ?? 0) > 0; + const isRunBlocked = Number(pendingCheck.at(0)?.pending_count ?? 0) > 0; let newStatus: TaskRunExecutionStatus = "SUSPENDED"; if (