Skip to content

Commit 64ddf12

Browse files
committed
fix: prevent MVCC race in blockRunWithWaitpoint pending check
Split the CTE in blockRunWithWaitpoint so the pending waitpoint check is a separate SQL statement. In READ COMMITTED isolation, each statement gets its own snapshot, so a separate SELECT sees the latest committed state from concurrent completeWaitpoint calls. Previously, the CTE did INSERT + pending check in one statement (one snapshot). If completeWaitpoint committed between the CTE start and the SELECT, the SELECT would still see PENDING due to the stale snapshot. Neither side would enqueue continueRunIfUnblocked, leaving the run stuck forever.
1 parent 921285c commit 64ddf12

File tree

1 file changed

+17
-6
lines changed

1 file changed

+17
-6
lines changed

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -397,8 +397,10 @@ export class WaitpointSystem {
397397
return await this.$.runLock.lock("blockRunWithWaitpoint", [runId], async () => {
398398
let snapshot: TaskRunExecutionSnapshot = await getLatestExecutionSnapshot(prisma, runId);
399399

400-
//block the run with the waitpoints, returning how many waitpoints are pending
401-
const insert = await prisma.$queryRaw<{ pending_count: BigInt }[]>`
400+
// Insert the blocking connections and the historical run connections.
401+
// We use a CTE to do both inserts atomically. Data-modifying CTEs are
402+
// always executed regardless of whether they're referenced in the outer query.
403+
await prisma.$queryRaw`
402404
WITH inserted AS (
403405
INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt", "spanIdToComplete", "batchId", "batchIndex")
404406
SELECT
@@ -423,12 +425,21 @@ export class WaitpointSystem {
423425
WHERE w.id IN (${Prisma.join($waitpoints)})
424426
ON CONFLICT DO NOTHING
425427
)
428+
SELECT COUNT(*) FROM inserted`;
429+
430+
// Check if the run is actually blocked using a separate query.
431+
// This MUST be a separate statement from the CTE above because in READ COMMITTED
432+
// isolation, each statement gets its own snapshot. The CTE's snapshot is taken when
433+
// it starts, so if a concurrent completeWaitpoint commits during the CTE, the CTE
434+
// won't see it. This fresh query gets a new snapshot that reflects the latest commits.
435+
const pendingCheck = await prisma.$queryRaw<{ pending_count: BigInt }[]>`
426436
SELECT COUNT(*) as pending_count
427-
FROM inserted i
428-
JOIN "Waitpoint" w ON w.id = i."waitpointId"
429-
WHERE w.status = 'PENDING';`;
437+
FROM "Waitpoint"
438+
WHERE id IN (${Prisma.join($waitpoints)})
439+
AND status = 'PENDING'
440+
`;
430441

431-
const isRunBlocked = Number(insert.at(0)?.pending_count ?? 0) > 0;
442+
const isRunBlocked = Number(pendingCheck.at(0)?.pending_count ?? 0) > 0;
432443

433444
let newStatus: TaskRunExecutionStatus = "SUSPENDED";
434445
if (

0 commit comments

Comments
 (0)