Skip to content

Commit 5c9689c

Browse files
committed
correctly clear runs from env current concurrency sets when dequeued from the ttl system
1 parent 09bf344 commit 5c9689c

File tree

2 files changed

+118
-3
lines changed

2 files changed

+118
-3
lines changed

internal-packages/run-engine/src/engine/tests/ttl.test.ts

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,119 @@ describe("RunEngine ttl", () => {
589589
}
590590
);
591591

592+
containerTest(
593+
"TTL expiration clears env concurrency keys with proj segment",
594+
async ({ prisma, redisOptions }) => {
595+
const authenticatedEnvironment =
596+
await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
597+
598+
const engine = new RunEngine({
599+
prisma,
600+
worker: {
601+
disabled: true,
602+
redis: redisOptions,
603+
workers: 1,
604+
tasksPerWorker: 10,
605+
pollIntervalMs: 100,
606+
},
607+
queue: {
608+
redis: redisOptions,
609+
processWorkerQueueDebounceMs: 50,
610+
masterQueueConsumersDisabled: true,
611+
ttlSystem: {
612+
pollIntervalMs: 5000,
613+
batchSize: 10,
614+
},
615+
},
616+
runLock: {
617+
redis: redisOptions,
618+
},
619+
machines: {
620+
defaultMachine: "small-1x",
621+
machines: {
622+
"small-1x": {
623+
name: "small-1x" as const,
624+
cpu: 0.5,
625+
memory: 0.5,
626+
centsPerMs: 0.0001,
627+
},
628+
},
629+
baseCostInCents: 0.0001,
630+
},
631+
tracer: trace.getTracer("test", "0.0.0"),
632+
});
633+
634+
try {
635+
const taskIdentifier = "test-task";
636+
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);
637+
638+
const run = await engine.trigger(
639+
{
640+
number: 1,
641+
friendlyId: "run_envkeys",
642+
environment: authenticatedEnvironment,
643+
taskIdentifier,
644+
payload: "{}",
645+
payloadType: "application/json",
646+
context: {},
647+
traceContext: {},
648+
traceId: "t1",
649+
spanId: "s1",
650+
workerQueue: "main",
651+
queue: "task/test-task",
652+
isTest: false,
653+
tags: [],
654+
ttl: "1s",
655+
},
656+
prisma
657+
);
658+
659+
const queue = engine.runQueue.keys.queueKey(
660+
authenticatedEnvironment,
661+
"task/test-task"
662+
);
663+
const envConcurrencyKey =
664+
engine.runQueue.keys.envCurrentConcurrencyKeyFromQueue(queue);
665+
const envDequeuedKey =
666+
engine.runQueue.keys.envCurrentDequeuedKeyFromQueue(queue);
667+
668+
await engine.runQueue.redis.sadd(envConcurrencyKey, run.id);
669+
await engine.runQueue.redis.sadd(envDequeuedKey, run.id);
670+
671+
const concurrencyBefore = await engine.runQueue.getCurrentConcurrencyOfEnvironment(
672+
authenticatedEnvironment
673+
);
674+
expect(concurrencyBefore).toContain(run.id);
675+
676+
await setTimeout(1_500);
677+
await engine.runQueue.processMasterQueueForEnvironment(
678+
authenticatedEnvironment.id,
679+
10
680+
);
681+
await setTimeout(7_000);
682+
683+
const expiredRun = await prisma.taskRun.findUnique({
684+
where: { id: run.id },
685+
select: { status: true },
686+
});
687+
expect(expiredRun?.status).toBe("EXPIRED");
688+
689+
const concurrencyAfter = await engine.runQueue.getCurrentConcurrencyOfEnvironment(
690+
authenticatedEnvironment
691+
);
692+
expect(concurrencyAfter).not.toContain(run.id);
693+
694+
const stillInDequeued = await engine.runQueue.redis.sismember(
695+
envDequeuedKey,
696+
run.id
697+
);
698+
expect(stillInDequeued).toBe(0);
699+
} finally {
700+
await engine.quit();
701+
}
702+
}
703+
);
704+
592705
containerTest(
593706
"Dequeue returns non-expired runs while skipping expired ones",
594707
async ({ prisma, redisOptions }) => {

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2656,9 +2656,11 @@ for i, member in ipairs(expiredMembers) do
26562656
redis.call('SREM', concurrencyKey, runId)
26572657
redis.call('SREM', dequeuedKey, runId)
26582658
2659-
-- Env concurrency (derive from rawQueueKey)
2660-
local envConcurrencyKey = keyPrefix .. "{org:" .. orgFromQueue .. "}:env:" .. (envMatch or "") .. ":currentConcurrency"
2661-
local envDequeuedKey = keyPrefix .. "{org:" .. orgFromQueue .. "}:env:" .. (envMatch or "") .. ":currentDequeued"
2659+
-- Env concurrency (derive from rawQueueKey; must match RunQueueKeyProducer: org + proj + env)
2660+
-- rawQueueKey format: {org:X}:proj:Y:env:Z:queue:Q[:ck:C]
2661+
local projMatch = string.match(rawQueueKey, ":proj:([^:]+):env:")
2662+
local envConcurrencyKey = keyPrefix .. "{org:" .. orgFromQueue .. "}:proj:" .. (projMatch or "") .. ":env:" .. (envMatch or "") .. ":currentConcurrency"
2663+
local envDequeuedKey = keyPrefix .. "{org:" .. orgFromQueue .. "}:proj:" .. (projMatch or "") .. ":env:" .. (envMatch or "") .. ":currentDequeued"
26622664
redis.call('SREM', envConcurrencyKey, runId)
26632665
redis.call('SREM', envDequeuedKey, runId)
26642666

0 commit comments

Comments
 (0)