diff --git a/common/changes/@microsoft/rush-lib/fix_weighted-concurrency-cap_2026-02-19-07-57.json b/common/changes/@microsoft/rush-lib/fix_weighted-concurrency-cap_2026-02-19-07-57.json new file mode 100644 index 00000000000..42da0861fef --- /dev/null +++ b/common/changes/@microsoft/rush-lib/fix_weighted-concurrency-cap_2026-02-19-07-57.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@microsoft/rush-lib", + "comment": "Fix weighted concurrency budget being capped by operation count", + "type": "patch" + } + ], + "packageName": "@microsoft/rush-lib" +} diff --git a/common/changes/@microsoft/rush/fix-weighted-concurrency-cap_2026-02-19-17-35.json b/common/changes/@microsoft/rush/fix-weighted-concurrency-cap_2026-02-19-17-35.json new file mode 100644 index 00000000000..c1aa27f2b1a --- /dev/null +++ b/common/changes/@microsoft/rush/fix-weighted-concurrency-cap_2026-02-19-17-35.json @@ -0,0 +1,11 @@ +{ + "changes": [ + { + "comment": "", + "type": "none", + "packageName": "@microsoft/rush" + } + ], + "packageName": "@microsoft/rush", + "email": "huytrngqu@users.noreply.github.com" +} \ No newline at end of file diff --git a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts index b0f2a3e0cb2..976a214f026 100644 --- a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts +++ b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts @@ -259,9 +259,13 @@ export class OperationExecutionManager { this._terminal.writeStdoutLine(''); } - this._terminal.writeStdoutLine(`Executing a maximum of ${this._parallelism} simultaneous processes...`); - - const maxParallelism: number = Math.min(totalOperations, this._parallelism); + // For display purposes, cap the reported number of simultaneous processes by the number of operations. + // This avoids confusing messages like "Executing a maximum of 10 simultaneous processes..." when + // there are only 4 operations. + const maxSimultaneousProcesses: number = Math.min(totalOperations, this._parallelism); + this._terminal.writeStdoutLine( + `Executing a maximum of ${maxSimultaneousProcesses} simultaneous processes...` + ); await this._beforeExecuteOperations?.(this._executionRecords); @@ -309,7 +313,10 @@ export class OperationExecutionManager { }, { allowOversubscription: this._allowOversubscription, - concurrency: maxParallelism, + // In weighted mode, concurrency represents the total "unit budget", not the max number of tasks. + // Do not cap by totalOperations, since that would incorrectly shrink the unit budget and + // reduce parallelism for operations with weight > 1. + concurrency: this._parallelism, weighted: true } ); diff --git a/libraries/rush-lib/src/logic/operations/test/OperationExecutionManager.test.ts b/libraries/rush-lib/src/logic/operations/test/OperationExecutionManager.test.ts index 094db7caf5a..c74b2a9a3d9 100644 --- a/libraries/rush-lib/src/logic/operations/test/OperationExecutionManager.test.ts +++ b/libraries/rush-lib/src/logic/operations/test/OperationExecutionManager.test.ts @@ -18,6 +18,7 @@ jest.mock('@rushstack/terminal', () => { import { Terminal, MockWritable, PrintUtilities } from '@rushstack/terminal'; import { CollatedTerminal } from '@rushstack/stream-collator'; +import { Async } from '@rushstack/node-core-library'; import type { IPhase } from '../../../api/CommandLineConfiguration'; import type { RushConfigurationProject } from '../../../api/RushConfigurationProject'; @@ -454,4 +455,262 @@ describe(OperationExecutionManager.name, () => { expect(mockWritable.getFormattedChunks()).toMatchSnapshot(); }); }); + + describe('Weighted concurrency', () => { + function createWeightedOperation( + name: string, + weight: number, + counters: { concurrentCount: number; peakConcurrency: number } + ): Operation { + const operation: Operation = new Operation({ + runner: new MockOperationRunner(name, async (terminal: CollatedTerminal) => { + counters.concurrentCount++; + if (counters.concurrentCount > counters.peakConcurrency) { + counters.peakConcurrency = counters.concurrentCount; + } + await Async.sleepAsync(0); + if (counters.concurrentCount > counters.peakConcurrency) { + counters.peakConcurrency = counters.concurrentCount; + } + counters.concurrentCount--; + return OperationStatus.Success; + }), + phase: mockPhase, + project: getOrCreateProject(name), + logFilenameIdentifier: name + }); + operation.weight = weight; + return operation; + } + + it('does not cap the unit budget by the number of operations (issue #5607 regression)', async () => { + // Regression test for https://github.com/microsoft/rushstack/issues/5607 + // With weighted scheduling, concurrency is a unit budget. The old code passed + // Math.min(totalOperations, parallelism), which shrinks the budget when + // totalOperations < parallelism, causing serialization for weight > 1. + const counters = { concurrentCount: 0, peakConcurrency: 0 }; + + const opA: Operation = createWeightedOperation('A', 4, counters); + const opB: Operation = createWeightedOperation('B', 4, counters); + const opC: Operation = createWeightedOperation('C', 4, counters); + const opD: Operation = createWeightedOperation('D', 4, counters); + + const manager: OperationExecutionManager = new OperationExecutionManager( + new Set([opA, opB, opC, opD]), + { + quietMode: true, + debugMode: false, + parallelism: 10, + allowOversubscription: false, + destination: mockWritable + } + ); + + const abortController = new AbortController(); + const result: IExecutionResult = await manager.executeAsync(abortController); + + expect(result.status).toEqual(OperationStatus.Success); + expect(counters.peakConcurrency).toEqual(2); + }); + + it('clamps weight to budget and completes without deadlock when weight exceeds budget', async () => { + const counters = { concurrentCount: 0, peakConcurrency: 0 }; + + const opA: Operation = createWeightedOperation('heavy-A', 10, counters); + const opB: Operation = createWeightedOperation('heavy-B', 10, counters); + + const manager: OperationExecutionManager = new OperationExecutionManager( + new Set([opA, opB]), + { + quietMode: true, + debugMode: false, + parallelism: 4, + allowOversubscription: false, + destination: mockWritable + } + ); + + const abortController = new AbortController(); + const result: IExecutionResult = await manager.executeAsync(abortController); + + expect(result.status).toEqual(OperationStatus.Success); + expect(result.operationResults.get(opA)?.status).toEqual(OperationStatus.Success); + expect(result.operationResults.get(opB)?.status).toEqual(OperationStatus.Success); + expect(counters.peakConcurrency).toEqual(1); + }); + + it('allows oversubscription when allowOversubscription is true', async () => { + const counters = { concurrentCount: 0, peakConcurrency: 0 }; + + const opA: Operation = createWeightedOperation('over-A', 7, counters); + const opB: Operation = createWeightedOperation('over-B', 7, counters); + + const manager: OperationExecutionManager = new OperationExecutionManager( + new Set([opA, opB]), + { + quietMode: true, + debugMode: false, + parallelism: 10, + allowOversubscription: true, + destination: mockWritable + } + ); + + const abortController = new AbortController(); + const result: IExecutionResult = await manager.executeAsync(abortController); + + expect(result.status).toEqual(OperationStatus.Success); + expect(counters.peakConcurrency).toEqual(2); + }); + + it('does not oversubscribe when allowOversubscription is false', async () => { + const counters = { concurrentCount: 0, peakConcurrency: 0 }; + + const opA: Operation = createWeightedOperation('strict-A', 7, counters); + const opB: Operation = createWeightedOperation('strict-B', 7, counters); + + const manager: OperationExecutionManager = new OperationExecutionManager( + new Set([opA, opB]), + { + quietMode: true, + debugMode: false, + parallelism: 10, + allowOversubscription: false, + destination: mockWritable + } + ); + + const abortController = new AbortController(); + const result: IExecutionResult = await manager.executeAsync(abortController); + + expect(result.status).toEqual(OperationStatus.Success); + expect(counters.peakConcurrency).toEqual(1); + }); + + it('zero-weight operations do not consume budget', async () => { + const counters = { concurrentCount: 0, peakConcurrency: 0 }; + + const heavyOp: Operation = createWeightedOperation('heavy', 9, counters); + const zeroA: Operation = createWeightedOperation('zero-A', 0, counters); + const zeroB: Operation = createWeightedOperation('zero-B', 0, counters); + const zeroC: Operation = createWeightedOperation('zero-C', 0, counters); + + const manager: OperationExecutionManager = new OperationExecutionManager( + new Set([heavyOp, zeroA, zeroB, zeroC]), + { + quietMode: true, + debugMode: false, + parallelism: 10, + allowOversubscription: false, + destination: mockWritable + } + ); + + const abortController = new AbortController(); + const result: IExecutionResult = await manager.executeAsync(abortController); + + expect(result.status).toEqual(OperationStatus.Success); + expect(counters.peakConcurrency).toBeGreaterThanOrEqual(2); + }); + + it('mixed weights respect the unit budget correctly', async () => { + const counters = { concurrentCount: 0, peakConcurrency: 0 }; + + const opA: Operation = createWeightedOperation('mix-A', 5, counters); + const opB: Operation = createWeightedOperation('mix-B', 5, counters); + const opC: Operation = createWeightedOperation('mix-C', 3, counters); + const opD: Operation = createWeightedOperation('mix-D', 3, counters); + + const manager: OperationExecutionManager = new OperationExecutionManager( + new Set([opA, opB, opC, opD]), + { + quietMode: true, + debugMode: false, + parallelism: 10, + allowOversubscription: false, + destination: mockWritable + } + ); + + const abortController = new AbortController(); + const result: IExecutionResult = await manager.executeAsync(abortController); + + expect(result.status).toEqual(OperationStatus.Success); + for (const [, opResult] of result.operationResults) { + expect(opResult.status).toEqual(OperationStatus.Success); + } + expect(counters.peakConcurrency).toBeGreaterThanOrEqual(2); + expect(counters.peakConcurrency).toBeLessThanOrEqual(3); + }); + + it('weight=1 operations behave identically to unweighted scheduling', async () => { + const counters = { concurrentCount: 0, peakConcurrency: 0 }; + + const ops: Operation[] = []; + for (let i = 0; i < 5; i++) { + ops.push(createWeightedOperation(`unit-${i}`, 1, counters)); + } + + const manager: OperationExecutionManager = new OperationExecutionManager(new Set(ops), { + quietMode: true, + debugMode: false, + parallelism: 3, + allowOversubscription: false, + destination: mockWritable + }); + + const abortController = new AbortController(); + const result: IExecutionResult = await manager.executeAsync(abortController); + + expect(result.status).toEqual(OperationStatus.Success); + expect(counters.peakConcurrency).toEqual(3); + }); + + it('displays the capped process count when parallelism exceeds operation count', async () => { + const counters = { concurrentCount: 0, peakConcurrency: 0 }; + + const ops: Operation[] = []; + for (let i = 0; i < 4; i++) { + ops.push(createWeightedOperation(`log-${i}`, 4, counters)); + } + + const manager: OperationExecutionManager = new OperationExecutionManager(new Set(ops), { + quietMode: false, + debugMode: false, + parallelism: 10, + allowOversubscription: false, + destination: mockWritable + }); + + const abortController = new AbortController(); + await manager.executeAsync(abortController); + + const allOutput: string = mockWritable.getAllOutput(); + expect(allOutput).toContain('Executing a maximum of 4 simultaneous processes...'); + expect(allOutput).not.toContain('Executing a maximum of 10 simultaneous processes...'); + }); + + it('displays parallelism when it is less than operation count', async () => { + const counters = { concurrentCount: 0, peakConcurrency: 0 }; + + const ops: Operation[] = []; + for (let i = 0; i < 10; i++) { + ops.push(createWeightedOperation(`many-${i}`, 1, counters)); + } + + const manager: OperationExecutionManager = new OperationExecutionManager(new Set(ops), { + quietMode: false, + debugMode: false, + parallelism: 3, + allowOversubscription: false, + destination: mockWritable + }); + + const abortController = new AbortController(); + await manager.executeAsync(abortController); + + const allOutput: string = mockWritable.getAllOutput(); + expect(allOutput).toContain('Executing a maximum of 3 simultaneous processes...'); + }); + }); });