Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@microsoft/rush-lib",
"comment": "Fix weighted concurrency budget being capped by operation count",
"type": "patch"
}
],
"packageName": "@microsoft/rush-lib"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"changes": [
{
"comment": "",
"type": "none",
"packageName": "@microsoft/rush"
}
],
"packageName": "@microsoft/rush",
"email": "huytrngqu@users.noreply.github.com"
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,13 @@ export class OperationExecutionManager {
this._terminal.writeStdoutLine('');
}

this._terminal.writeStdoutLine(`Executing a maximum of ${this._parallelism} simultaneous processes...`);

const maxParallelism: number = Math.min(totalOperations, this._parallelism);
// For display purposes, cap the reported number of simultaneous processes by the number of operations.
// This avoids confusing messages like "Executing a maximum of 10 simultaneous processes..." when
// there are only 4 operations.
const maxSimultaneousProcesses: number = Math.min(totalOperations, this._parallelism);
this._terminal.writeStdoutLine(
`Executing a maximum of ${maxSimultaneousProcesses} simultaneous processes...`
);

await this._beforeExecuteOperations?.(this._executionRecords);

Expand Down Expand Up @@ -309,7 +313,10 @@ export class OperationExecutionManager {
},
{
allowOversubscription: this._allowOversubscription,
concurrency: maxParallelism,
// In weighted mode, concurrency represents the total "unit budget", not the max number of tasks.
// Do not cap by totalOperations, since that would incorrectly shrink the unit budget and
// reduce parallelism for operations with weight > 1.
concurrency: this._parallelism,
weighted: true
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ jest.mock('@rushstack/terminal', () => {

import { Terminal, MockWritable, PrintUtilities } from '@rushstack/terminal';
import { CollatedTerminal } from '@rushstack/stream-collator';
import { Async } from '@rushstack/node-core-library';

import type { IPhase } from '../../../api/CommandLineConfiguration';
import type { RushConfigurationProject } from '../../../api/RushConfigurationProject';
Expand Down Expand Up @@ -454,4 +455,262 @@ describe(OperationExecutionManager.name, () => {
expect(mockWritable.getFormattedChunks()).toMatchSnapshot();
});
});

describe('Weighted concurrency', () => {
function createWeightedOperation(
name: string,
weight: number,
counters: { concurrentCount: number; peakConcurrency: number }
): Operation {
const operation: Operation = new Operation({
runner: new MockOperationRunner(name, async (terminal: CollatedTerminal) => {
counters.concurrentCount++;
if (counters.concurrentCount > counters.peakConcurrency) {
counters.peakConcurrency = counters.concurrentCount;
}
await Async.sleepAsync(0);
if (counters.concurrentCount > counters.peakConcurrency) {
counters.peakConcurrency = counters.concurrentCount;
}
counters.concurrentCount--;
return OperationStatus.Success;
}),
phase: mockPhase,
project: getOrCreateProject(name),
logFilenameIdentifier: name
});
operation.weight = weight;
return operation;
}

it('does not cap the unit budget by the number of operations (issue #5607 regression)', async () => {
// Regression test for https://github.com/microsoft/rushstack/issues/5607
// With weighted scheduling, concurrency is a unit budget. The old code passed
// Math.min(totalOperations, parallelism), which shrinks the budget when
// totalOperations < parallelism, causing serialization for weight > 1.
const counters = { concurrentCount: 0, peakConcurrency: 0 };

const opA: Operation = createWeightedOperation('A', 4, counters);
const opB: Operation = createWeightedOperation('B', 4, counters);
const opC: Operation = createWeightedOperation('C', 4, counters);
const opD: Operation = createWeightedOperation('D', 4, counters);

const manager: OperationExecutionManager = new OperationExecutionManager(
new Set([opA, opB, opC, opD]),
{
quietMode: true,
debugMode: false,
parallelism: 10,
allowOversubscription: false,
destination: mockWritable
}
);

const abortController = new AbortController();
const result: IExecutionResult = await manager.executeAsync(abortController);

expect(result.status).toEqual(OperationStatus.Success);
expect(counters.peakConcurrency).toEqual(2);
});

it('clamps weight to budget and completes without deadlock when weight exceeds budget', async () => {
const counters = { concurrentCount: 0, peakConcurrency: 0 };

const opA: Operation = createWeightedOperation('heavy-A', 10, counters);
const opB: Operation = createWeightedOperation('heavy-B', 10, counters);

const manager: OperationExecutionManager = new OperationExecutionManager(
new Set([opA, opB]),
{
quietMode: true,
debugMode: false,
parallelism: 4,
allowOversubscription: false,
destination: mockWritable
}
);

const abortController = new AbortController();
const result: IExecutionResult = await manager.executeAsync(abortController);

expect(result.status).toEqual(OperationStatus.Success);
expect(result.operationResults.get(opA)?.status).toEqual(OperationStatus.Success);
expect(result.operationResults.get(opB)?.status).toEqual(OperationStatus.Success);
expect(counters.peakConcurrency).toEqual(1);
});

it('allows oversubscription when allowOversubscription is true', async () => {
const counters = { concurrentCount: 0, peakConcurrency: 0 };

const opA: Operation = createWeightedOperation('over-A', 7, counters);
const opB: Operation = createWeightedOperation('over-B', 7, counters);

const manager: OperationExecutionManager = new OperationExecutionManager(
new Set([opA, opB]),
{
quietMode: true,
debugMode: false,
parallelism: 10,
allowOversubscription: true,
destination: mockWritable
}
);

const abortController = new AbortController();
const result: IExecutionResult = await manager.executeAsync(abortController);

expect(result.status).toEqual(OperationStatus.Success);
expect(counters.peakConcurrency).toEqual(2);
});

it('does not oversubscribe when allowOversubscription is false', async () => {
const counters = { concurrentCount: 0, peakConcurrency: 0 };

const opA: Operation = createWeightedOperation('strict-A', 7, counters);
const opB: Operation = createWeightedOperation('strict-B', 7, counters);

const manager: OperationExecutionManager = new OperationExecutionManager(
new Set([opA, opB]),
{
quietMode: true,
debugMode: false,
parallelism: 10,
allowOversubscription: false,
destination: mockWritable
}
);

const abortController = new AbortController();
const result: IExecutionResult = await manager.executeAsync(abortController);

expect(result.status).toEqual(OperationStatus.Success);
expect(counters.peakConcurrency).toEqual(1);
});

it('zero-weight operations do not consume budget', async () => {
const counters = { concurrentCount: 0, peakConcurrency: 0 };

const heavyOp: Operation = createWeightedOperation('heavy', 9, counters);
const zeroA: Operation = createWeightedOperation('zero-A', 0, counters);
const zeroB: Operation = createWeightedOperation('zero-B', 0, counters);
const zeroC: Operation = createWeightedOperation('zero-C', 0, counters);

const manager: OperationExecutionManager = new OperationExecutionManager(
new Set([heavyOp, zeroA, zeroB, zeroC]),
{
quietMode: true,
debugMode: false,
parallelism: 10,
allowOversubscription: false,
destination: mockWritable
}
);

const abortController = new AbortController();
const result: IExecutionResult = await manager.executeAsync(abortController);

expect(result.status).toEqual(OperationStatus.Success);
expect(counters.peakConcurrency).toBeGreaterThanOrEqual(2);
});

it('mixed weights respect the unit budget correctly', async () => {
const counters = { concurrentCount: 0, peakConcurrency: 0 };

const opA: Operation = createWeightedOperation('mix-A', 5, counters);
const opB: Operation = createWeightedOperation('mix-B', 5, counters);
const opC: Operation = createWeightedOperation('mix-C', 3, counters);
const opD: Operation = createWeightedOperation('mix-D', 3, counters);

const manager: OperationExecutionManager = new OperationExecutionManager(
new Set([opA, opB, opC, opD]),
{
quietMode: true,
debugMode: false,
parallelism: 10,
allowOversubscription: false,
destination: mockWritable
}
);

const abortController = new AbortController();
const result: IExecutionResult = await manager.executeAsync(abortController);

expect(result.status).toEqual(OperationStatus.Success);
for (const [, opResult] of result.operationResults) {
expect(opResult.status).toEqual(OperationStatus.Success);
}
expect(counters.peakConcurrency).toBeGreaterThanOrEqual(2);
expect(counters.peakConcurrency).toBeLessThanOrEqual(3);
});

it('weight=1 operations behave identically to unweighted scheduling', async () => {
const counters = { concurrentCount: 0, peakConcurrency: 0 };

const ops: Operation[] = [];
for (let i = 0; i < 5; i++) {
ops.push(createWeightedOperation(`unit-${i}`, 1, counters));
}

const manager: OperationExecutionManager = new OperationExecutionManager(new Set(ops), {
quietMode: true,
debugMode: false,
parallelism: 3,
allowOversubscription: false,
destination: mockWritable
});

const abortController = new AbortController();
const result: IExecutionResult = await manager.executeAsync(abortController);

expect(result.status).toEqual(OperationStatus.Success);
expect(counters.peakConcurrency).toEqual(3);
});

it('displays the capped process count when parallelism exceeds operation count', async () => {
const counters = { concurrentCount: 0, peakConcurrency: 0 };

const ops: Operation[] = [];
for (let i = 0; i < 4; i++) {
ops.push(createWeightedOperation(`log-${i}`, 4, counters));
}

const manager: OperationExecutionManager = new OperationExecutionManager(new Set(ops), {
quietMode: false,
debugMode: false,
parallelism: 10,
allowOversubscription: false,
destination: mockWritable
});

const abortController = new AbortController();
await manager.executeAsync(abortController);

const allOutput: string = mockWritable.getAllOutput();
expect(allOutput).toContain('Executing a maximum of 4 simultaneous processes...');
expect(allOutput).not.toContain('Executing a maximum of 10 simultaneous processes...');
});

it('displays parallelism when it is less than operation count', async () => {
const counters = { concurrentCount: 0, peakConcurrency: 0 };

const ops: Operation[] = [];
for (let i = 0; i < 10; i++) {
ops.push(createWeightedOperation(`many-${i}`, 1, counters));
}

const manager: OperationExecutionManager = new OperationExecutionManager(new Set(ops), {
quietMode: false,
debugMode: false,
parallelism: 3,
allowOversubscription: false,
destination: mockWritable
});

const abortController = new AbortController();
await manager.executeAsync(abortController);

const allOutput: string = mockWritable.getAllOutput();
expect(allOutput).toContain('Executing a maximum of 3 simultaneous processes...');
});
});
});