Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,22 @@ export class WaitpointSystem {

/**
* Prevents a run from continuing until the waitpoint is completed.
*
* This method uses two separate SQL statements intentionally:
*
* 1. A CTE that INSERTs TaskRunWaitpoint rows (blocking connections) and
* _WaitpointRunConnections rows (historical connections).
*
* 2. A separate SELECT that checks if any of the requested waitpoints are still PENDING.
*
* These MUST be separate statements because of PostgreSQL MVCC in READ COMMITTED isolation:
* each statement gets its own snapshot. If a concurrent `completeWaitpoint` commits between
* the CTE starting and finishing, the CTE's snapshot won't see the COMPLETED status. By using
* a separate SELECT, we get a fresh snapshot that reflects the latest committed state.
*
* The pending check queries ALL requested waitpoint IDs (not just the ones actually inserted
* by the CTE). This is intentional: if a TaskRunWaitpoint row already existed (ON CONFLICT
* DO NOTHING skipped the insert), a still-PENDING waitpoint should still count as blocking.
*/
async blockRunWithWaitpoint({
runId,
Expand Down Expand Up @@ -397,8 +413,10 @@ export class WaitpointSystem {
return await this.$.runLock.lock("blockRunWithWaitpoint", [runId], async () => {
let snapshot: TaskRunExecutionSnapshot = await getLatestExecutionSnapshot(prisma, runId);

//block the run with the waitpoints, returning how many waitpoints are pending
const insert = await prisma.$queryRaw<{ pending_count: BigInt }[]>`
// Insert the blocking connections and the historical run connections.
// We use a CTE to do both inserts atomically. Data-modifying CTEs are
// always executed regardless of whether they're referenced in the outer query.
await prisma.$queryRaw`
WITH inserted AS (
INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt", "spanIdToComplete", "batchId", "batchIndex")
SELECT
Expand All @@ -423,12 +441,21 @@ export class WaitpointSystem {
WHERE w.id IN (${Prisma.join($waitpoints)})
ON CONFLICT DO NOTHING
)
SELECT COUNT(*) FROM inserted`;

// Check if the run is actually blocked using a separate query.
// This MUST be a separate statement from the CTE above because in READ COMMITTED
// isolation, each statement gets its own snapshot. The CTE's snapshot is taken when
// it starts, so if a concurrent completeWaitpoint commits during the CTE, the CTE
// won't see it. This fresh query gets a new snapshot that reflects the latest commits.
const pendingCheck = await prisma.$queryRaw<{ pending_count: BigInt }[]>`
SELECT COUNT(*) as pending_count
FROM inserted i
JOIN "Waitpoint" w ON w.id = i."waitpointId"
WHERE w.status = 'PENDING';`;
FROM "Waitpoint"
WHERE id IN (${Prisma.join($waitpoints)})
AND status = 'PENDING'
`;

const isRunBlocked = Number(insert.at(0)?.pending_count ?? 0) > 0;
const isRunBlocked = Number(pendingCheck.at(0)?.pending_count ?? 0) > 0;

let newStatus: TaskRunExecutionStatus = "SUSPENDED";
if (
Expand Down