From cefce09e7019f9da2c91bc11d99fde0bae83784e Mon Sep 17 00:00:00 2001 From: Bryan Call Date: Wed, 4 Feb 2026 10:22:54 -0800 Subject: [PATCH 01/11] autest: Add parallel test runner wrapper script Add autest-parallel.py which runs autest tests in parallel by spawning multiple autest processes with isolated sandboxes and port ranges. Key changes: - ports.py: Add AUTEST_PORT_OFFSET environment variable support to offset the starting port range for each parallel worker, avoiding port conflicts - autest-parallel.py: New script that discovers tests, partitions them across workers, runs them in parallel, and aggregates results Usage: ./autest-parallel.py -j 8 --ats-bin /opt/ats/bin --sandbox /tmp/sb Note: The built-in autest -j flag does not work with ATS tests (causes "No Test run defined" failures), hence the need for this wrapper. Tests with hardcoded ports (select_ports=False) cannot safely run in parallel and may still fail. --- tests/autest-parallel.py | 474 ++++++++++++++++++++++++++ tests/gold_tests/autest-site/ports.py | 14 +- 2 files changed, 487 insertions(+), 1 deletion(-) create mode 100755 tests/autest-parallel.py diff --git a/tests/autest-parallel.py b/tests/autest-parallel.py new file mode 100755 index 00000000000..dcecfc08466 --- /dev/null +++ b/tests/autest-parallel.py @@ -0,0 +1,474 @@ +#!/usr/bin/env python3 +''' +Parallel autest runner for Apache Traffic Server. + +This script runs autest tests in parallel by spawning multiple autest processes, +each with a different port offset to avoid port conflicts. + +Usage: + ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel + ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb + ./autest-parallel.py --list # Just list tests without running +''' +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import fnmatch +import os +import re +import subprocess +import sys +import time +from concurrent.futures import ProcessPoolExecutor, as_completed +from dataclasses import dataclass, field +from pathlib import Path +from typing import List, Optional + + +@dataclass +class TestResult: + """Result from running a single autest process.""" + worker_id: int + tests: List[str] + passed: int = 0 + failed: int = 0 + skipped: int = 0 + warnings: int = 0 + exceptions: int = 0 + unknown: int = 0 + duration: float = 0.0 + failed_tests: List[str] = field(default_factory=list) + output: str = "" + return_code: int = 0 + + +def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = None) -> List[str]: + """ + Discover all .test.py files in the test directory. + + Args: + test_dir: Path to gold_tests directory + filter_patterns: Optional list of glob patterns to filter tests + + Returns: + List of test names (without .test.py extension) + """ + tests = [] + for test_file in test_dir.rglob("*.test.py"): + # Extract test name (filename without .test.py) + test_name = test_file.stem.replace('.test', '') + + # Apply filters if provided + if filter_patterns: + if any(fnmatch.fnmatch(test_name, pattern) for pattern in filter_patterns): + tests.append(test_name) + else: + tests.append(test_name) + + return sorted(tests) + + +def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]: + """ + Partition tests into roughly equal groups for parallel execution. + + Args: + tests: List of test names + num_jobs: Number of parallel workers + + Returns: + List of test lists, one per worker + """ + if num_jobs <= 0: + num_jobs = 1 + + partitions = [[] for _ in range(min(num_jobs, len(tests)))] + for i, test in enumerate(tests): + partitions[i % len(partitions)].append(test) + + return [p for p in partitions if p] # Remove empty partitions + + +def strip_ansi(text: str) -> str: + """Remove ANSI escape codes from text.""" + ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') + return ansi_escape.sub('', text) + + +def parse_autest_output(output: str) -> dict: + """ + Parse autest output to extract pass/fail counts. + + Args: + output: Raw autest output string + + Returns: + Dictionary with counts for passed, failed, skipped, etc. + """ + result = { + 'passed': 0, + 'failed': 0, + 'skipped': 0, + 'warnings': 0, + 'exceptions': 0, + 'unknown': 0, + 'failed_tests': [] + } + + # Strip ANSI codes for easier parsing + clean_output = strip_ansi(output) + + # Parse the summary section + # Format: " Passed: 2" or " Failed: 0" + for line in clean_output.split('\n'): + line = line.strip() + if 'Passed:' in line: + try: + result['passed'] = int(line.split(':')[-1].strip()) + except ValueError: + pass + elif 'Failed:' in line and 'test' not in line.lower(): + try: + result['failed'] = int(line.split(':')[-1].strip()) + except ValueError: + pass + elif 'Skipped:' in line: + try: + result['skipped'] = int(line.split(':')[-1].strip()) + except ValueError: + pass + elif 'Warning:' in line: + try: + result['warnings'] = int(line.split(':')[-1].strip()) + except ValueError: + pass + elif 'Exception:' in line: + try: + result['exceptions'] = int(line.split(':')[-1].strip()) + except ValueError: + pass + elif 'Unknown:' in line: + try: + result['unknown'] = int(line.split(':')[-1].strip()) + except ValueError: + pass + + # Extract failed test names + # Look for lines like "Test: test_name: Failed" + failed_pattern = re.compile(r'Test:\s+(\S+):\s+Failed', re.IGNORECASE) + for match in failed_pattern.finditer(clean_output): + result['failed_tests'].append(match.group(1)) + + return result + + +def run_worker( + worker_id: int, + tests: List[str], + script_dir: Path, + sandbox_base: Path, + ats_bin: str, + extra_args: List[str], + port_offset_step: int = 1000, + verbose: bool = False +) -> TestResult: + """ + Run autest on a subset of tests with isolated sandbox and port range. + + Args: + worker_id: Worker identifier (0, 1, 2, ...) + tests: List of test names to run + script_dir: Directory containing autest.sh + sandbox_base: Base sandbox directory + ats_bin: Path to ATS bin directory + extra_args: Additional arguments to pass to autest + port_offset_step: Port offset between workers + verbose: Whether to print verbose output + + Returns: + TestResult with pass/fail counts + """ + start_time = time.time() + result = TestResult(worker_id=worker_id, tests=tests) + + # Create worker-specific sandbox + sandbox = sandbox_base / f"worker-{worker_id}" + sandbox.mkdir(parents=True, exist_ok=True) + + # Calculate port offset for this worker + port_offset = worker_id * port_offset_step + + # Build autest command + # Use 'uv run autest' directly for better compatibility + cmd = [ + 'uv', 'run', 'autest', 'run', + '--directory', 'gold_tests', + '--ats-bin', ats_bin, + '--sandbox', str(sandbox), + ] + + # Add test filters + cmd.append('--filters') + cmd.extend(tests) + + # Add any extra arguments + cmd.extend(extra_args) + + # Set up environment with port offset + # We set this as an actual OS environment variable so ports.py can read it + env = os.environ.copy() + env['AUTEST_PORT_OFFSET'] = str(port_offset) + + if verbose: + print(f"[Worker {worker_id}] Running {len(tests)} tests with port offset {port_offset}") + print(f"[Worker {worker_id}] Command: {' '.join(cmd)}") + + try: + proc = subprocess.run( + cmd, + cwd=script_dir, + capture_output=True, + text=True, + env=env, + timeout=3600 # 1 hour timeout per worker + ) + result.output = proc.stdout + proc.stderr + result.return_code = proc.returncode + + # Parse results + parsed = parse_autest_output(result.output) + result.passed = parsed['passed'] + result.failed = parsed['failed'] + result.skipped = parsed['skipped'] + result.warnings = parsed['warnings'] + result.exceptions = parsed['exceptions'] + result.unknown = parsed['unknown'] + result.failed_tests = parsed['failed_tests'] + + except subprocess.TimeoutExpired: + result.output = "TIMEOUT: Worker exceeded 1 hour timeout" + result.return_code = -1 + result.failed = len(tests) + except Exception as e: + result.output = f"ERROR: {str(e)}" + result.return_code = -1 + result.failed = len(tests) + + result.duration = time.time() - start_time + return result + + +def print_summary(results: List[TestResult], total_duration: float): + """Print aggregated results from all workers.""" + total_passed = sum(r.passed for r in results) + total_failed = sum(r.failed for r in results) + total_skipped = sum(r.skipped for r in results) + total_warnings = sum(r.warnings for r in results) + total_exceptions = sum(r.exceptions for r in results) + total_unknown = sum(r.unknown for r in results) + total_tests = total_passed + total_failed + total_skipped + total_warnings + total_exceptions + total_unknown + + all_failed_tests = [] + for r in results: + all_failed_tests.extend(r.failed_tests) + + print("\n" + "=" * 70) + print("PARALLEL AUTEST SUMMARY") + print("=" * 70) + print(f"Workers: {len(results)}") + print(f"Total tests: {total_tests}") + print(f"Duration: {total_duration:.1f}s") + print("-" * 70) + print(f" Passed: {total_passed}") + print(f" Failed: {total_failed}") + print(f" Skipped: {total_skipped}") + print(f" Warnings: {total_warnings}") + print(f" Exceptions: {total_exceptions}") + print(f" Unknown: {total_unknown}") + + if all_failed_tests: + print("-" * 70) + print("FAILED TESTS:") + for test in sorted(all_failed_tests): + print(f" - {test}") + + print("=" * 70) + + # Per-worker summary + print("\nPer-worker breakdown:") + for r in results: + status = "OK" if r.failed == 0 and r.exceptions == 0 else "FAIL" + print(f" Worker {r.worker_id}: {r.passed} passed, {r.failed} failed, " + f"{r.skipped} skipped ({r.duration:.1f}s) [{status}]") + + +def main(): + parser = argparse.ArgumentParser( + description='Run autest tests in parallel', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=''' +Examples: + # Run all tests with 4 parallel workers + %(prog)s -j 4 --ats-bin /opt/ats/bin --sandbox /tmp/autest + + # Run specific tests + %(prog)s -j 2 --filter "cache-*" --filter "tls-*" --ats-bin /opt/ats/bin --sandbox /tmp/autest + + # List tests without running + %(prog)s --list --ats-bin /opt/ats/bin +''' + ) + + parser.add_argument( + '-j', '--jobs', + type=int, + default=os.cpu_count() or 4, + help='Number of parallel workers (default: CPU count)' + ) + parser.add_argument( + '--ats-bin', + required=True, + help='Path to ATS bin directory' + ) + parser.add_argument( + '--sandbox', + default='/tmp/autest-parallel', + help='Base sandbox directory (default: /tmp/autest-parallel)' + ) + parser.add_argument( + '-f', '--filter', + action='append', + dest='filters', + help='Filter tests by glob pattern (can be specified multiple times)' + ) + parser.add_argument( + '--list', + action='store_true', + help='List tests without running' + ) + parser.add_argument( + '--port-offset-step', + type=int, + default=1000, + help='Port offset between workers (default: 1000)' + ) + parser.add_argument( + '-v', '--verbose', + action='store_true', + help='Verbose output' + ) + parser.add_argument( + '--test-dir', + default='gold_tests', + help='Test directory relative to script location (default: gold_tests)' + ) + parser.add_argument( + 'extra_args', + nargs='*', + help='Additional arguments to pass to autest' + ) + + args = parser.parse_args() + + # Determine paths + script_dir = Path(__file__).parent.resolve() + test_dir = script_dir / args.test_dir + + if not test_dir.exists(): + print(f"Error: Test directory not found: {test_dir}", file=sys.stderr) + sys.exit(1) + + # Discover tests + tests = discover_tests(test_dir, args.filters) + + if not tests: + print("No tests found matching the specified filters.", file=sys.stderr) + sys.exit(1) + + print(f"Found {len(tests)} tests") + + if args.list: + print("\nTests:") + for test in tests: + print(f" {test}") + sys.exit(0) + + # Partition tests + num_jobs = min(args.jobs, len(tests)) + partitions = partition_tests(tests, num_jobs) + + print(f"Running with {len(partitions)} parallel workers") + print(f"Port offset step: {args.port_offset_step}") + print(f"Sandbox: {args.sandbox}") + + # Create sandbox base directory + sandbox_base = Path(args.sandbox) + sandbox_base.mkdir(parents=True, exist_ok=True) + + # Run workers in parallel + start_time = time.time() + results: List[TestResult] = [] + + with ProcessPoolExecutor(max_workers=len(partitions)) as executor: + futures = {} + for worker_id, worker_tests in enumerate(partitions): + future = executor.submit( + run_worker, + worker_id=worker_id, + tests=worker_tests, + script_dir=script_dir, + sandbox_base=sandbox_base, + ats_bin=args.ats_bin, + extra_args=args.extra_args or [], + port_offset_step=args.port_offset_step, + verbose=args.verbose + ) + futures[future] = worker_id + + # Collect results as they complete + for future in as_completed(futures): + worker_id = futures[future] + try: + result = future.result() + results.append(result) + status = "PASS" if result.failed == 0 else "FAIL" + print(f"[Worker {worker_id}] Completed: {result.passed} passed, " + f"{result.failed} failed ({result.duration:.1f}s) [{status}]") + except Exception as e: + print(f"[Worker {worker_id}] Error: {e}", file=sys.stderr) + results.append(TestResult( + worker_id=worker_id, + tests=partitions[worker_id], + failed=len(partitions[worker_id]), + output=str(e) + )) + + total_duration = time.time() - start_time + + # Sort results by worker_id for consistent output + results.sort(key=lambda r: r.worker_id) + + # Print summary + print_summary(results, total_duration) + + # Exit with non-zero if any tests failed + total_failed = sum(r.failed + r.exceptions for r in results) + sys.exit(1 if total_failed > 0 else 0) + + +if __name__ == '__main__': + main() diff --git a/tests/gold_tests/autest-site/ports.py b/tests/gold_tests/autest-site/ports.py index bc4de3c1e32..6e402ca0eaf 100644 --- a/tests/gold_tests/autest-site/ports.py +++ b/tests/gold_tests/autest-site/ports.py @@ -145,6 +145,11 @@ def _get_listening_ports() -> Set[int]: def _setup_port_queue(amount=1000): """ Build up the set of ports that the OS in theory will not use. + + The AUTEST_PORT_OFFSET environment variable can be used to offset the + starting port range. This is useful when running multiple autest processes + in parallel to avoid port conflicts. Each parallel worker should use a + different offset (e.g., 0, 1000, 2000, etc.). """ global g_ports if g_ports is None: @@ -154,6 +159,12 @@ def _setup_port_queue(amount=1000): # The queue has already been populated. host.WriteDebug('_setup_port_queue', f"Queue was previously populated. Queue size: {g_ports.qsize()}") return + + # Get port offset for parallel execution support + port_offset = int(os.environ.get('AUTEST_PORT_OFFSET', 0)) + if port_offset > 0: + host.WriteVerbose('_setup_port_queue', f"Using port offset: {port_offset}") + try: # Use sysctl to find the range of ports that the OS publishes it uses. # some docker setups don't have sbin setup correctly @@ -177,7 +188,8 @@ def _setup_port_queue(amount=1000): listening_ports = _get_listening_ports() if rmax > amount: # Fill in ports, starting above the upper OS-usable port range. - port = dmax + 1 + # Add port_offset to support parallel test execution. + port = dmax + 1 + port_offset while port < 65536 and g_ports.qsize() < amount: if PortOpen(port, listening_ports=listening_ports): host.WriteDebug('_setup_port_queue', f"Rejecting an already open port: {port}") From 419eba279824985ec787fb2dcd740b665c92cf93 Mon Sep 17 00:00:00 2001 From: Bryan Call Date: Sat, 7 Feb 2026 12:01:27 -0800 Subject: [PATCH 02/11] autest: Enhance parallel runner with serial test support and LPT scheduling - Add serial_tests.txt for tests that cannot run in parallel (hardcoded ports) - Implement LPT (Longest Processing Time) load balancing using timing data - Add --collect-timings flag to record per-test durations for future runs - Fix port offset for low-range ports in ports.py (was missing offset) - Convert config.test.py and copy_config.test.py to use dynamic ports (removed select_ports=False) - Add run_single_test() for serial execution with timing --- tests/autest-parallel.py | 598 +++++++++++++++++---- tests/gold_tests/autest-site/ports.py | 3 +- tests/gold_tests/basic/config.test.py | 3 +- tests/gold_tests/basic/copy_config.test.py | 8 +- tests/serial_tests.txt | 14 + 5 files changed, 512 insertions(+), 114 deletions(-) create mode 100644 tests/serial_tests.txt diff --git a/tests/autest-parallel.py b/tests/autest-parallel.py index dcecfc08466..7a9e718c261 100755 --- a/tests/autest-parallel.py +++ b/tests/autest-parallel.py @@ -28,6 +28,7 @@ import argparse import fnmatch +import json import os import re import subprocess @@ -35,8 +36,16 @@ import time from concurrent.futures import ProcessPoolExecutor, as_completed from dataclasses import dataclass, field +from datetime import datetime from pathlib import Path -from typing import List, Optional +from typing import Dict, List, Optional, Tuple + +# Default timing file location +DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json" +# Default serial tests file location +DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt" +# Default estimate for unknown tests (seconds) +DEFAULT_TEST_TIME = 15.0 @dataclass @@ -52,8 +61,10 @@ class TestResult: unknown: int = 0 duration: float = 0.0 failed_tests: List[str] = field(default_factory=list) + test_timings: Dict[str, float] = field(default_factory=dict) output: str = "" return_code: int = 0 + is_serial: bool = False def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = None) -> List[str]: @@ -82,9 +93,65 @@ def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = None) return sorted(tests) +def load_serial_tests(serial_file: Path) -> set: + """ + Load list of tests that must run serially from a file. + + The file format is one test name per line, with # for comments. + Test names can be: + - Simple names: test_name (matches any test containing this) + - Full paths: subdir/test_name.test.py + + Returns: + Set of test names that must run serially + """ + serial_tests = set() + if not serial_file.exists(): + return serial_tests + + try: + with open(serial_file) as f: + for line in f: + line = line.strip() + # Skip empty lines and comments + if not line or line.startswith('#'): + continue + # Remove .test.py extension if present + if line.endswith('.test.py'): + line = line[:-8] # Remove .test.py + # Extract just the test name from path + test_name = Path(line).stem.replace('.test', '') + serial_tests.add(test_name) + except IOError: + pass + + return serial_tests + + +def load_timings(timing_file: Path) -> Dict[str, float]: + """Load test timing data from JSON file.""" + if timing_file.exists(): + try: + with open(timing_file) as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + pass + return {} + + +def save_timings(timing_file: Path, timings: Dict[str, float]): + """Save test timing data to JSON file.""" + try: + with open(timing_file, 'w') as f: + json.dump(timings, f, indent=2, sort_keys=True) + except IOError as e: + print(f"Warning: Could not save timings: {e}", file=sys.stderr) + + def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]: """ Partition tests into roughly equal groups for parallel execution. + Simple round-robin partitioning (used when no timing data available). Args: tests: List of test names @@ -103,6 +170,62 @@ def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]: return [p for p in partitions if p] # Remove empty partitions +def partition_tests_by_time( + tests: List[str], + num_jobs: int, + timings: Dict[str, float] +) -> Tuple[List[List[str]], List[float]]: + """ + Partition tests using LPT (Longest Processing Time first) algorithm. + This balances the load across workers based on expected test duration. + + Args: + tests: List of test names + num_jobs: Number of parallel workers + timings: Dictionary of test name -> expected duration in seconds + + Returns: + Tuple of (partitions, expected_durations) where: + - partitions: List of test lists, one per worker + - expected_durations: Expected total duration for each worker + """ + if num_jobs <= 0: + num_jobs = 1 + + num_workers = min(num_jobs, len(tests)) + + # Get timing for each test, use default for unknown + test_times = [] + unknown_tests = [] + for test in tests: + if test in timings: + test_times.append((test, timings[test])) + else: + unknown_tests.append(test) + + # Sort known tests by time (longest first) for LPT algorithm + test_times.sort(key=lambda x: x[1], reverse=True) + + # Initialize workers + partitions = [[] for _ in range(num_workers)] + worker_loads = [0.0] * num_workers + + # Assign known tests using LPT: assign to worker with least load + for test, duration in test_times: + min_worker = min(range(num_workers), key=lambda w: worker_loads[w]) + partitions[min_worker].append(test) + worker_loads[min_worker] += duration + + # Distribute unknown tests evenly across workers with least load + # Sort unknown tests and distribute them one at a time to balance + for test in unknown_tests: + min_worker = min(range(num_workers), key=lambda w: worker_loads[w]) + partitions[min_worker].append(test) + worker_loads[min_worker] += DEFAULT_TEST_TIME + + return partitions, worker_loads + + def strip_ansi(text: str) -> str: """Remove ANSI escape codes from text.""" ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') @@ -111,13 +234,13 @@ def strip_ansi(text: str) -> str: def parse_autest_output(output: str) -> dict: """ - Parse autest output to extract pass/fail counts. + Parse autest output to extract pass/fail counts and per-test timings. Args: output: Raw autest output string Returns: - Dictionary with counts for passed, failed, skipped, etc. + Dictionary with counts for passed, failed, skipped, etc. and test_timings """ result = { 'passed': 0, @@ -126,15 +249,49 @@ def parse_autest_output(output: str) -> dict: 'warnings': 0, 'exceptions': 0, 'unknown': 0, - 'failed_tests': [] + 'failed_tests': [], + 'test_timings': {} } # Strip ANSI codes for easier parsing clean_output = strip_ansi(output) + lines = clean_output.split('\n') + + # Track test start times to calculate duration + # Autest output format: + # Running Test: test_name + # ... test output ... + # Test: test_name: Passed/Failed + current_test = None + test_start_line = None + + # First pass: find test results and their line positions + test_results = [] # (line_num, test_name, result) + for i, line in enumerate(lines): + line_stripped = line.strip() + + # Match "Running Test: test_name" or "Test: test_name: Passed/Failed" + running_match = re.match(r'Running Test:\s+(\S+)', line_stripped) + result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', line_stripped, re.IGNORECASE) + + if running_match: + test_results.append((i, running_match.group(1), 'start')) + elif result_match: + test_results.append((i, result_match.group(1), result_match.group(2).lower())) + + # Calculate per-test timing based on line positions + # (rough approximation - actual timing would be better from autest if available) + for i, (line_num, test_name, status) in enumerate(test_results): + if status == 'start': + # Find the corresponding end + for j in range(i + 1, len(test_results)): + end_line, end_name, end_status = test_results[j] + if end_name == test_name and end_status != 'start': + # We don't have actual time, but we'll track it from the worker + break # Parse the summary section - # Format: " Passed: 2" or " Failed: 0" - for line in clean_output.split('\n'): + for line in lines: line = line.strip() if 'Passed:' in line: try: @@ -168,7 +325,6 @@ def parse_autest_output(output: str) -> dict: pass # Extract failed test names - # Look for lines like "Test: test_name: Failed" failed_pattern = re.compile(r'Test:\s+(\S+):\s+Failed', re.IGNORECASE) for match in failed_pattern.finditer(clean_output): result['failed_tests'].append(match.group(1)) @@ -176,6 +332,50 @@ def parse_autest_output(output: str) -> dict: return result +def run_single_test( + test: str, + script_dir: Path, + sandbox: Path, + ats_bin: str, + extra_args: List[str], + env: dict +) -> Tuple[str, float, bool, str]: + """ + Run a single test and return its timing. + + Returns: + Tuple of (test_name, duration, passed, output) + """ + cmd = [ + 'uv', 'run', 'autest', 'run', + '--directory', 'gold_tests', + '--ats-bin', ats_bin, + '--sandbox', str(sandbox / test), + '--filters', test + ] + cmd.extend(extra_args) + + start = time.time() + try: + proc = subprocess.run( + cmd, + cwd=script_dir, + capture_output=True, + text=True, + env=env, + timeout=600 # 10 minute timeout per test + ) + duration = time.time() - start + output = proc.stdout + proc.stderr + parsed = parse_autest_output(output) + passed = parsed['failed'] == 0 and parsed['exceptions'] == 0 + return (test, duration, passed, output) + except subprocess.TimeoutExpired: + return (test, 600.0, False, "TIMEOUT") + except Exception as e: + return (test, time.time() - start, False, str(e)) + + def run_worker( worker_id: int, tests: List[str], @@ -184,7 +384,8 @@ def run_worker( ats_bin: str, extra_args: List[str], port_offset_step: int = 1000, - verbose: bool = False + verbose: bool = False, + collect_timings: bool = False ) -> TestResult: """ Run autest on a subset of tests with isolated sandbox and port range. @@ -198,9 +399,10 @@ def run_worker( extra_args: Additional arguments to pass to autest port_offset_step: Port offset between workers verbose: Whether to print verbose output + collect_timings: If True, run tests one at a time to collect accurate timing Returns: - TestResult with pass/fail counts + TestResult with pass/fail counts and per-test timings """ start_time = time.time() result = TestResult(worker_id=worker_id, tests=tests) @@ -212,67 +414,90 @@ def run_worker( # Calculate port offset for this worker port_offset = worker_id * port_offset_step - # Build autest command - # Use 'uv run autest' directly for better compatibility - cmd = [ - 'uv', 'run', 'autest', 'run', - '--directory', 'gold_tests', - '--ats-bin', ats_bin, - '--sandbox', str(sandbox), - ] - - # Add test filters - cmd.append('--filters') - cmd.extend(tests) - - # Add any extra arguments - cmd.extend(extra_args) - # Set up environment with port offset - # We set this as an actual OS environment variable so ports.py can read it env = os.environ.copy() env['AUTEST_PORT_OFFSET'] = str(port_offset) - if verbose: - print(f"[Worker {worker_id}] Running {len(tests)} tests with port offset {port_offset}") - print(f"[Worker {worker_id}] Command: {' '.join(cmd)}") - - try: - proc = subprocess.run( - cmd, - cwd=script_dir, - capture_output=True, - text=True, - env=env, - timeout=3600 # 1 hour timeout per worker - ) - result.output = proc.stdout + proc.stderr - result.return_code = proc.returncode - - # Parse results - parsed = parse_autest_output(result.output) - result.passed = parsed['passed'] - result.failed = parsed['failed'] - result.skipped = parsed['skipped'] - result.warnings = parsed['warnings'] - result.exceptions = parsed['exceptions'] - result.unknown = parsed['unknown'] - result.failed_tests = parsed['failed_tests'] - - except subprocess.TimeoutExpired: - result.output = "TIMEOUT: Worker exceeded 1 hour timeout" - result.return_code = -1 - result.failed = len(tests) - except Exception as e: - result.output = f"ERROR: {str(e)}" - result.return_code = -1 - result.failed = len(tests) + if collect_timings: + # Run tests one at a time to collect accurate timing + all_output = [] + total_tests = len(tests) + for idx, test in enumerate(tests, 1): + test_name, duration, passed, output = run_single_test( + test, script_dir, sandbox, ats_bin, extra_args, env + ) + result.test_timings[test_name] = duration + all_output.append(output) + + if passed: + result.passed += 1 + else: + result.failed += 1 + result.failed_tests.append(test_name) + + status = "PASS" if passed else "FAIL" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + # Fixed-width format: date time status duration worker progress test_name + print(f"{timestamp} {status:4s} {duration:6.1f}s Worker:{worker_id:2d} {idx:2d}/{total_tests:2d} {test}", flush=True) + + result.output = "\n".join(all_output) + result.return_code = 0 if result.failed == 0 else 1 + else: + # Run all tests in batch (faster but no per-test timing) + cmd = [ + 'uv', 'run', 'autest', 'run', + '--directory', 'gold_tests', + '--ats-bin', ats_bin, + '--sandbox', str(sandbox), + ] + + # Add test filters + cmd.append('--filters') + cmd.extend(tests) + + # Add any extra arguments + cmd.extend(extra_args) + + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"{timestamp} Worker:{worker_id:2d} Starting batch of {len(tests)} tests (port offset {port_offset})", flush=True) + + try: + proc = subprocess.run( + cmd, + cwd=script_dir, + capture_output=True, + text=True, + env=env, + timeout=3600 # 1 hour timeout per worker + ) + result.output = proc.stdout + proc.stderr + result.return_code = proc.returncode + + # Parse results + parsed = parse_autest_output(result.output) + result.passed = parsed['passed'] + result.failed = parsed['failed'] + result.skipped = parsed['skipped'] + result.warnings = parsed['warnings'] + result.exceptions = parsed['exceptions'] + result.unknown = parsed['unknown'] + result.failed_tests = parsed['failed_tests'] + + except subprocess.TimeoutExpired: + result.output = "TIMEOUT: Worker exceeded 1 hour timeout" + result.return_code = -1 + result.failed = len(tests) + except Exception as e: + result.output = f"ERROR: {str(e)}" + result.return_code = -1 + result.failed = len(tests) result.duration = time.time() - start_time return result -def print_summary(results: List[TestResult], total_duration: float): +def print_summary(results: List[TestResult], total_duration: float, + expected_timings: Optional[Dict[str, float]] = None): """Print aggregated results from all workers.""" total_passed = sum(r.passed for r in results) total_failed = sum(r.failed for r in results) @@ -286,6 +511,11 @@ def print_summary(results: List[TestResult], total_duration: float): for r in results: all_failed_tests.extend(r.failed_tests) + # Collect actual timings from results + actual_timings = {} + for r in results: + actual_timings.update(r.test_timings) + print("\n" + "=" * 70) print("PARALLEL AUTEST SUMMARY") print("=" * 70) @@ -306,14 +536,43 @@ def print_summary(results: List[TestResult], total_duration: float): for test in sorted(all_failed_tests): print(f" - {test}") + # Check for timing discrepancies + if expected_timings and actual_timings: + timing_warnings = [] + for test, actual in actual_timings.items(): + if test in expected_timings: + expected = expected_timings[test] + if expected > 0: + ratio = actual / expected + diff = abs(actual - expected) + # Flag if: (>2x ratio AND >10s diff) OR (>30s diff regardless of ratio) + if ((ratio > 2.0 or ratio < 0.5) and diff > 10) or diff > 30: + timing_warnings.append((test, expected, actual, ratio)) + + if timing_warnings: + print("-" * 70) + print("TIMING DISCREPANCIES (expected vs actual):") + for test, expected, actual, ratio in sorted(timing_warnings, key=lambda x: -abs(x[2] - x[1])): + direction = "slower" if actual > expected else "faster" + print(f" {test}: {expected:.1f}s -> {actual:.1f}s ({ratio:.1f}x {direction})") + print("=" * 70) # Per-worker summary print("\nPer-worker breakdown:") for r in results: status = "OK" if r.failed == 0 and r.exceptions == 0 else "FAIL" - print(f" Worker {r.worker_id}: {r.passed} passed, {r.failed} failed, " - f"{r.skipped} skipped ({r.duration:.1f}s) [{status}]") + if r.is_serial: + worker_label = " Serial: " + else: + worker_label = f" Worker:{r.worker_id:2d}" + print(f"{worker_label} {r.passed:3d} passed, {r.failed:3d} failed, " + f"{r.skipped:3d} skipped ({r.duration:6.1f}s) [{status}]") + + # Total summary line + print("-" * 70) + print(f" TOTAL: {total_passed:3d} passed, {total_failed:3d} failed, " + f"{total_skipped:3d} skipped ({total_duration:6.1f}s)") def main(): @@ -330,6 +589,12 @@ def main(): # List tests without running %(prog)s --list --ats-bin /opt/ats/bin + + # Collect timing data (runs tests one at a time for accurate timing) + %(prog)s -j 4 --collect-timings --ats-bin /opt/ats/bin --sandbox /tmp/autest + + # Use saved timing data for load-balanced partitioning + %(prog)s -j 16 --timings-file test-timings.json --ats-bin /opt/ats/bin --sandbox /tmp/autest ''' ) @@ -376,6 +641,33 @@ def main(): default='gold_tests', help='Test directory relative to script location (default: gold_tests)' ) + parser.add_argument( + '--collect-timings', + action='store_true', + help='Run tests one at a time to collect accurate per-test timing data' + ) + parser.add_argument( + '--timings-file', + type=Path, + default=DEFAULT_TIMING_FILE, + help=f'Path to timing data JSON file (default: {DEFAULT_TIMING_FILE})' + ) + parser.add_argument( + '--no-timing', + action='store_true', + help='Disable timing-based load balancing (use round-robin partitioning)' + ) + parser.add_argument( + '--serial-tests-file', + type=Path, + default=DEFAULT_SERIAL_TESTS_FILE, + help=f'Path to file listing tests that must run serially (default: {DEFAULT_SERIAL_TESTS_FILE})' + ) + parser.add_argument( + '--no-serial', + action='store_true', + help='Skip serial tests entirely' + ) parser.add_argument( 'extra_args', nargs='*', @@ -392,28 +684,68 @@ def main(): print(f"Error: Test directory not found: {test_dir}", file=sys.stderr) sys.exit(1) + # Load serial tests list + serial_tests = load_serial_tests(args.serial_tests_file) + if serial_tests: + print(f"Loaded {len(serial_tests)} serial tests from {args.serial_tests_file}") + # Discover tests - tests = discover_tests(test_dir, args.filters) + all_tests = discover_tests(test_dir, args.filters) - if not tests: + if not all_tests: print("No tests found matching the specified filters.", file=sys.stderr) sys.exit(1) - print(f"Found {len(tests)} tests") + # Separate parallel and serial tests + parallel_tests = [t for t in all_tests if t not in serial_tests] + serial_tests_to_run = [t for t in all_tests if t in serial_tests] + + print(f"Found {len(all_tests)} tests ({len(parallel_tests)} parallel, {len(serial_tests_to_run)} serial)") if args.list: - print("\nTests:") - for test in tests: + print("\nParallel tests:") + for test in parallel_tests: print(f" {test}") + if serial_tests_to_run: + print("\nSerial tests (will run after parallel tests):") + for test in serial_tests_to_run: + print(f" {test}") sys.exit(0) - # Partition tests - num_jobs = min(args.jobs, len(tests)) - partitions = partition_tests(tests, num_jobs) - - print(f"Running with {len(partitions)} parallel workers") + # Load existing timing data + timings = {} + if not args.no_timing: + timings = load_timings(args.timings_file) + if timings: + known_tests = sum(1 for t in parallel_tests if t in timings) + print(f"Loaded timing data for {known_tests}/{len(parallel_tests)} parallel tests from {args.timings_file}") + + # Partition parallel tests + num_jobs = min(args.jobs, len(parallel_tests)) if parallel_tests else 0 + + if parallel_tests: + if timings and not args.no_timing: + # Use timing-based load balancing (LPT algorithm) + partitions, expected_loads = partition_tests_by_time(parallel_tests, num_jobs, timings) + print(f"Using timing-based load balancing") + if args.verbose: + for i, load in enumerate(expected_loads): + print(f" Worker {i}: {len(partitions[i])} tests, ~{load:.1f}s expected") + else: + # Fall back to simple round-robin partitioning + partitions = partition_tests(parallel_tests, num_jobs) + print(f"Using round-robin partitioning") + else: + partitions = [] + + if partitions: + print(f"Running with {len(partitions)} parallel workers") print(f"Port offset step: {args.port_offset_step}") print(f"Sandbox: {args.sandbox}") + if args.collect_timings: + print("Collecting per-test timing data (tests run sequentially per worker)") + if serial_tests_to_run and not args.no_serial: + print(f"Serial tests will run after parallel tests complete ({len(serial_tests_to_run)} tests)") # Create sandbox base directory sandbox_base = Path(args.sandbox) @@ -423,47 +755,101 @@ def main(): start_time = time.time() results: List[TestResult] = [] - with ProcessPoolExecutor(max_workers=len(partitions)) as executor: - futures = {} - for worker_id, worker_tests in enumerate(partitions): - future = executor.submit( - run_worker, - worker_id=worker_id, - tests=worker_tests, - script_dir=script_dir, - sandbox_base=sandbox_base, - ats_bin=args.ats_bin, - extra_args=args.extra_args or [], - port_offset_step=args.port_offset_step, - verbose=args.verbose + if partitions: + with ProcessPoolExecutor(max_workers=len(partitions)) as executor: + futures = {} + for worker_id, worker_tests in enumerate(partitions): + future = executor.submit( + run_worker, + worker_id=worker_id, + tests=worker_tests, + script_dir=script_dir, + sandbox_base=sandbox_base, + ats_bin=args.ats_bin, + extra_args=args.extra_args or [], + port_offset_step=args.port_offset_step, + verbose=args.verbose, + collect_timings=args.collect_timings + ) + futures[future] = worker_id + + # Collect results as they complete + for future in as_completed(futures): + worker_id = futures[future] + try: + result = future.result() + results.append(result) + status = "PASS" if result.failed == 0 else "FAIL" + print(f"[Worker {worker_id}] Completed: {result.passed} passed, " + f"{result.failed} failed ({result.duration:.1f}s) [{status}]") + except Exception as e: + print(f"[Worker {worker_id}] Error: {e}", file=sys.stderr) + results.append(TestResult( + worker_id=worker_id, + tests=partitions[worker_id], + failed=len(partitions[worker_id]), + output=str(e) + )) + + # Run serial tests after parallel tests complete + if serial_tests_to_run and not args.no_serial: + print(f"\n{'=' * 70}") + print("RUNNING SERIAL TESTS") + print(f"{'=' * 70}") + serial_start = time.time() + + # Use a special worker ID for serial tests (after parallel workers) + serial_worker_id = len(partitions) if partitions else 0 + + # Set up environment without port offset (serial tests run alone) + env = os.environ.copy() + env['AUTEST_PORT_OFFSET'] = '0' + + serial_result = TestResult(worker_id=serial_worker_id, tests=serial_tests_to_run, is_serial=True) + + for idx, test in enumerate(serial_tests_to_run, 1): + test_name, duration, passed, output = run_single_test( + test, script_dir, sandbox_base / "serial", args.ats_bin, + args.extra_args or [], env ) - futures[future] = worker_id + serial_result.test_timings[test_name] = duration - # Collect results as they complete - for future in as_completed(futures): - worker_id = futures[future] - try: - result = future.result() - results.append(result) - status = "PASS" if result.failed == 0 else "FAIL" - print(f"[Worker {worker_id}] Completed: {result.passed} passed, " - f"{result.failed} failed ({result.duration:.1f}s) [{status}]") - except Exception as e: - print(f"[Worker {worker_id}] Error: {e}", file=sys.stderr) - results.append(TestResult( - worker_id=worker_id, - tests=partitions[worker_id], - failed=len(partitions[worker_id]), - output=str(e) - )) + if passed: + serial_result.passed += 1 + else: + serial_result.failed += 1 + serial_result.failed_tests.append(test_name) + + status = "PASS" if passed else "FAIL" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"{timestamp} {status:4s} {duration:6.1f}s Serial {idx:2d}/{len(serial_tests_to_run):2d} {test}", flush=True) + + serial_result.duration = time.time() - serial_start + results.append(serial_result) + + print(f"\n[Serial] Completed: {serial_result.passed} passed, " + f"{serial_result.failed} failed ({serial_result.duration:.1f}s)") total_duration = time.time() - start_time # Sort results by worker_id for consistent output results.sort(key=lambda r: r.worker_id) - # Print summary - print_summary(results, total_duration) + # Collect and save timing data if collected + if args.collect_timings: + new_timings = dict(timings) # Start with existing timings + tests_timed = 0 + for r in results: + for test_name, duration in r.test_timings.items(): + new_timings[test_name] = duration + tests_timed += 1 + if tests_timed > 0: + save_timings(args.timings_file, new_timings) + print(f"\nSaved timing data for {tests_timed} tests to {args.timings_file}") + print(f"Total tests in timing database: {len(new_timings)}") + + # Print summary (pass expected timings for discrepancy check) + print_summary(results, total_duration, timings if args.collect_timings else None) # Exit with non-zero if any tests failed total_failed = sum(r.failed + r.exceptions for r in results) diff --git a/tests/gold_tests/autest-site/ports.py b/tests/gold_tests/autest-site/ports.py index 6e402ca0eaf..adb28cc3f24 100644 --- a/tests/gold_tests/autest-site/ports.py +++ b/tests/gold_tests/autest-site/ports.py @@ -198,9 +198,10 @@ def _setup_port_queue(amount=1000): g_ports.put(port) port += 1 if rmin > amount and g_ports.qsize() < amount: - port = 2001 # Fill in more ports, starting at 2001, well above well known ports, # and going up until the minimum port range used by the OS. + # Add port_offset to support parallel test execution (same as high range). + port = 2001 + port_offset while port < dmin and g_ports.qsize() < amount: if PortOpen(port, listening_ports=listening_ports): host.WriteDebug('_setup_port_queue', f"Rejecting an already open port: {port}") diff --git a/tests/gold_tests/basic/config.test.py b/tests/gold_tests/basic/config.test.py index 1cb2bf8bc76..c78769465c7 100644 --- a/tests/gold_tests/basic/config.test.py +++ b/tests/gold_tests/basic/config.test.py @@ -18,8 +18,7 @@ Test.Summary = "Test start up of Traffic server with configuration modification of starting port" -ts = Test.MakeATSProcess("ts", select_ports=False) -ts.Variables.port = 8090 +ts = Test.MakeATSProcess("ts") ts.Disk.records_config.update({ 'proxy.config.http.server_ports': str(ts.Variables.port) + f" {ts.Variables.uds_path}", }) diff --git a/tests/gold_tests/basic/copy_config.test.py b/tests/gold_tests/basic/copy_config.test.py index ea08732be06..38961f8a851 100644 --- a/tests/gold_tests/basic/copy_config.test.py +++ b/tests/gold_tests/basic/copy_config.test.py @@ -18,16 +18,14 @@ Test.Summary = "Test start up of Traffic server with configuration modification of starting port of different servers at the same time" -# set up some ATS processes -ts1 = Test.MakeATSProcess("ts1", select_ports=False) -ts1.Variables.port = 8090 +# set up some ATS processes with dynamic port selection +ts1 = Test.MakeATSProcess("ts1") ts1.Disk.records_config.update({ 'proxy.config.http.server_ports': str(ts1.Variables.port) + f" {ts1.Variables.uds_path}", }) ts1.Ready = When.PortOpen(ts1.Variables.port) -ts2 = Test.MakeATSProcess("ts2", select_ports=False, enable_uds=False) -ts2.Variables.port = 8091 +ts2 = Test.MakeATSProcess("ts2", enable_uds=False) ts2.Disk.records_config.update({ 'proxy.config.http.server_ports': str(ts2.Variables.port), }) diff --git a/tests/serial_tests.txt b/tests/serial_tests.txt new file mode 100644 index 00000000000..43675a3b9d2 --- /dev/null +++ b/tests/serial_tests.txt @@ -0,0 +1,14 @@ +# Tests that must run serially (not in parallel) due to hardcoded ports or +# other constraints that prevent parallel execution. +# +# Format: One test file path per line, relative to tests/gold_tests/ +# Lines starting with # are comments and ignored. + +# Tests that intentionally use select_ports=False for specific port testing +tls/tls_sni_with_port.test.py + +# Tests using select_ports=False without dynamic ports +redirect/redirect_to_same_origin_on_cache.test.py + +# Tests with hardcoded ports that are difficult to make dynamic +parent_proxy/parent-retry.test.py From e02b566a6cae5c82ae5ccf14f0f131e8843b7134 Mon Sep 17 00:00:00 2001 From: Bryan Call Date: Sat, 7 Feb 2026 17:59:10 -0800 Subject: [PATCH 03/11] autest: Fix parallel runner build-root, skip detection, and false positives - Add --build-root CLI argument to properly locate test plugins in the build directory (fixes ~57 test failures from missing test plugins) - Fix skip detection: tests skipped due to missing dependencies (lua, QUIC, go-httpbin, uri_signing) are now reported as SKIP, not FAIL - Fix false-positive detection: tests that error at setup (e.g., missing proxy verifier) with 0 pass/0 fail are now correctly reported as FAIL - Return PASS/FAIL/SKIP status from run_single_test() instead of bool - Track skipped count in per-worker results and summary output --- tests/autest-parallel.py | 68 +++++++++++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 14 deletions(-) diff --git a/tests/autest-parallel.py b/tests/autest-parallel.py index 7a9e718c261..9500b1aba84 100755 --- a/tests/autest-parallel.py +++ b/tests/autest-parallel.py @@ -337,19 +337,22 @@ def run_single_test( script_dir: Path, sandbox: Path, ats_bin: str, + build_root: str, extra_args: List[str], env: dict -) -> Tuple[str, float, bool, str]: +) -> Tuple[str, float, str, str]: """ Run a single test and return its timing. Returns: - Tuple of (test_name, duration, passed, output) + Tuple of (test_name, duration, status, output) + status is one of: "PASS", "FAIL", "SKIP" """ cmd = [ 'uv', 'run', 'autest', 'run', '--directory', 'gold_tests', '--ats-bin', ats_bin, + '--build-root', build_root, '--sandbox', str(sandbox / test), '--filters', test ] @@ -368,12 +371,22 @@ def run_single_test( duration = time.time() - start output = proc.stdout + proc.stderr parsed = parse_autest_output(output) - passed = parsed['failed'] == 0 and parsed['exceptions'] == 0 - return (test, duration, passed, output) + # Determine status: + # - SKIP: test was skipped (missing dependency, unsupported feature) + # - PASS: test ran and passed + # - FAIL: test failed, had exceptions, or nothing ran at all + if parsed['skipped'] > 0 and parsed['passed'] == 0 and parsed['failed'] == 0: + status = "SKIP" + elif (parsed['failed'] == 0 and parsed['exceptions'] == 0 + and proc.returncode == 0 and (parsed['passed'] > 0 or parsed['skipped'] > 0)): + status = "PASS" + else: + status = "FAIL" + return (test, duration, status, output) except subprocess.TimeoutExpired: - return (test, 600.0, False, "TIMEOUT") + return (test, 600.0, "FAIL", "TIMEOUT") except Exception as e: - return (test, time.time() - start, False, str(e)) + return (test, time.time() - start, "FAIL", str(e)) def run_worker( @@ -382,6 +395,7 @@ def run_worker( script_dir: Path, sandbox_base: Path, ats_bin: str, + build_root: str, extra_args: List[str], port_offset_step: int = 1000, verbose: bool = False, @@ -396,6 +410,7 @@ def run_worker( script_dir: Directory containing autest.sh sandbox_base: Base sandbox directory ats_bin: Path to ATS bin directory + build_root: Path to the build directory (for test plugins etc.) extra_args: Additional arguments to pass to autest port_offset_step: Port offset between workers verbose: Whether to print verbose output @@ -423,19 +438,20 @@ def run_worker( all_output = [] total_tests = len(tests) for idx, test in enumerate(tests, 1): - test_name, duration, passed, output = run_single_test( - test, script_dir, sandbox, ats_bin, extra_args, env + test_name, duration, status, output = run_single_test( + test, script_dir, sandbox, ats_bin, build_root, extra_args, env ) result.test_timings[test_name] = duration all_output.append(output) - if passed: + if status == "PASS": result.passed += 1 + elif status == "SKIP": + result.skipped += 1 else: result.failed += 1 result.failed_tests.append(test_name) - status = "PASS" if passed else "FAIL" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Fixed-width format: date time status duration worker progress test_name print(f"{timestamp} {status:4s} {duration:6.1f}s Worker:{worker_id:2d} {idx:2d}/{total_tests:2d} {test}", flush=True) @@ -448,6 +464,7 @@ def run_worker( 'uv', 'run', 'autest', 'run', '--directory', 'gold_tests', '--ats-bin', ats_bin, + '--build-root', build_root, '--sandbox', str(sandbox), ] @@ -483,6 +500,14 @@ def run_worker( result.unknown = parsed['unknown'] result.failed_tests = parsed['failed_tests'] + # If no tests ran at all (passed + failed + skipped == 0), + # autest likely errored at setup (e.g., missing proxy verifier). + # Count all tests as failed to avoid false positives. + total_ran = result.passed + result.failed + result.skipped + if total_ran == 0 and proc.returncode != 0: + result.failed = len(tests) + result.failed_tests = list(tests) + except subprocess.TimeoutExpired: result.output = "TIMEOUT: Worker exceeded 1 hour timeout" result.return_code = -1 @@ -609,6 +634,12 @@ def main(): required=True, help='Path to ATS bin directory' ) + parser.add_argument( + '--build-root', + default=None, + help='Path to the build directory (for test plugins, etc.). ' + 'Defaults to the source tree root.' + ) parser.add_argument( '--sandbox', default='/tmp/autest-parallel', @@ -680,6 +711,12 @@ def main(): script_dir = Path(__file__).parent.resolve() test_dir = script_dir / args.test_dir + # Resolve build root (defaults to source tree root, i.e. parent of tests/) + if args.build_root: + build_root = str(Path(args.build_root).resolve()) + else: + build_root = str(script_dir.parent) + if not test_dir.exists(): print(f"Error: Test directory not found: {test_dir}", file=sys.stderr) sys.exit(1) @@ -740,6 +777,7 @@ def main(): if partitions: print(f"Running with {len(partitions)} parallel workers") + print(f"Build root: {build_root}") print(f"Port offset step: {args.port_offset_step}") print(f"Sandbox: {args.sandbox}") if args.collect_timings: @@ -766,6 +804,7 @@ def main(): script_dir=script_dir, sandbox_base=sandbox_base, ats_bin=args.ats_bin, + build_root=build_root, extra_args=args.extra_args or [], port_offset_step=args.port_offset_step, verbose=args.verbose, @@ -808,19 +847,20 @@ def main(): serial_result = TestResult(worker_id=serial_worker_id, tests=serial_tests_to_run, is_serial=True) for idx, test in enumerate(serial_tests_to_run, 1): - test_name, duration, passed, output = run_single_test( + test_name, duration, status, output = run_single_test( test, script_dir, sandbox_base / "serial", args.ats_bin, - args.extra_args or [], env + build_root, args.extra_args or [], env ) serial_result.test_timings[test_name] = duration - if passed: + if status == "PASS": serial_result.passed += 1 + elif status == "SKIP": + serial_result.skipped += 1 else: serial_result.failed += 1 serial_result.failed_tests.append(test_name) - status = "PASS" if passed else "FAIL" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"{timestamp} {status:4s} {duration:6.1f}s Serial {idx:2d}/{len(serial_tests_to_run):2d} {test}", flush=True) From e1318fcd9352d141046b03b06f6df442ba94d6e5 Mon Sep 17 00:00:00 2001 From: Bryan Call Date: Sat, 7 Feb 2026 20:38:51 -0800 Subject: [PATCH 04/11] Improve verbose mode for interactive use - In batch mode with -v, stream autest output in real-time using Popen instead of subprocess.run, showing "Running Test..." lines as they happen - Print test list preview per worker during partitioning (-v) - Show timestamps and skipped counts in worker completion messages - Print first 5 test names per worker at batch start (-v) --- tests/autest-parallel.py | 64 +++++++++++++++++++++++++++++++--------- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/tests/autest-parallel.py b/tests/autest-parallel.py index 9500b1aba84..fad0d057c25 100755 --- a/tests/autest-parallel.py +++ b/tests/autest-parallel.py @@ -477,18 +477,44 @@ def run_worker( timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"{timestamp} Worker:{worker_id:2d} Starting batch of {len(tests)} tests (port offset {port_offset})", flush=True) + if verbose: + print(f" Worker:{worker_id:2d} Tests: {', '.join(tests[:5])}" + f"{'...' if len(tests) > 5 else ''}", flush=True) try: - proc = subprocess.run( - cmd, - cwd=script_dir, - capture_output=True, - text=True, - env=env, - timeout=3600 # 1 hour timeout per worker - ) - result.output = proc.stdout + proc.stderr - result.return_code = proc.returncode + if verbose: + # Stream output in real-time so the user sees test progress + proc = subprocess.Popen( + cmd, + cwd=script_dir, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + env=env, + ) + output_lines = [] + for line in proc.stdout: + output_lines.append(line) + # Print lines that show test progress + clean = strip_ansi(line).strip() + if clean.startswith('Running Test') or 'Passed' in clean or 'Failed' in clean: + if clean.startswith('Running Test'): + ts = datetime.now().strftime("%H:%M:%S") + print(f" [{ts}] Worker:{worker_id:2d} {clean}", flush=True) + proc.wait(timeout=3600) + result.output = ''.join(output_lines) + result.return_code = proc.returncode + else: + proc = subprocess.run( + cmd, + cwd=script_dir, + capture_output=True, + text=True, + env=env, + timeout=3600 # 1 hour timeout per worker + ) + result.output = proc.stdout + proc.stderr + result.return_code = proc.returncode # Parse results parsed = parse_autest_output(result.output) @@ -504,7 +530,7 @@ def run_worker( # autest likely errored at setup (e.g., missing proxy verifier). # Count all tests as failed to avoid false positives. total_ran = result.passed + result.failed + result.skipped - if total_ran == 0 and proc.returncode != 0: + if total_ran == 0 and (hasattr(proc, 'returncode') and proc.returncode != 0): result.failed = len(tests) result.failed_tests = list(tests) @@ -767,11 +793,18 @@ def main(): print(f"Using timing-based load balancing") if args.verbose: for i, load in enumerate(expected_loads): - print(f" Worker {i}: {len(partitions[i])} tests, ~{load:.1f}s expected") + tests_preview = ', '.join(partitions[i][:3]) + suffix = f", ... (+{len(partitions[i])-3} more)" if len(partitions[i]) > 3 else "" + print(f" Worker {i}: {len(partitions[i])} tests, ~{load:.1f}s expected [{tests_preview}{suffix}]") else: # Fall back to simple round-robin partitioning partitions = partition_tests(parallel_tests, num_jobs) print(f"Using round-robin partitioning") + if args.verbose: + for i in range(len(partitions)): + tests_preview = ', '.join(partitions[i][:3]) + suffix = f", ... (+{len(partitions[i])-3} more)" if len(partitions[i]) > 3 else "" + print(f" Worker {i}: {len(partitions[i])} tests [{tests_preview}{suffix}]") else: partitions = [] @@ -819,8 +852,11 @@ def main(): result = future.result() results.append(result) status = "PASS" if result.failed == 0 else "FAIL" - print(f"[Worker {worker_id}] Completed: {result.passed} passed, " - f"{result.failed} failed ({result.duration:.1f}s) [{status}]") + ts = datetime.now().strftime("%H:%M:%S") + parts = [f"{result.passed} passed", f"{result.failed} failed"] + if result.skipped > 0: + parts.append(f"{result.skipped} skipped") + print(f"[{ts}] Worker:{worker_id:2d} Done: {', '.join(parts)} ({result.duration:.1f}s) [{status}]") except Exception as e: print(f"[Worker {worker_id}] Error: {e}", file=sys.stderr) results.append(TestResult( From f432732fa2b09cf1257ebf314cfa20af182b9398 Mon Sep 17 00:00:00 2001 From: Bryan Call Date: Sat, 7 Feb 2026 20:47:49 -0800 Subject: [PATCH 05/11] Add live progress line with test counts and ETA Default output now shows a single in-place progress line (using \r) that updates as workers complete: [Parallel] 145/389 tests (37%) | 8/16 workers done | 52s elapsed | ETA: 1m 28s - Shows tests done/total with percentage - Workers completed out of total - Failed and skipped counts (when non-zero) - Elapsed time and estimated time remaining - Updates in-place for both parallel and serial phases - Verbose mode (-v) still prints per-worker detail lines above the progress --- tests/autest-parallel.py | 94 ++++++++++++++++++++++++++++++++++------ 1 file changed, 80 insertions(+), 14 deletions(-) diff --git a/tests/autest-parallel.py b/tests/autest-parallel.py index fad0d057c25..80bb0948007 100755 --- a/tests/autest-parallel.py +++ b/tests/autest-parallel.py @@ -808,25 +808,65 @@ def main(): else: partitions = [] + # Compute totals for progress tracking + total_tests = len(parallel_tests) + (len(serial_tests_to_run) if serial_tests_to_run and not args.no_serial else 0) + serial_count = len(serial_tests_to_run) if serial_tests_to_run and not args.no_serial else 0 + if partitions: print(f"Running with {len(partitions)} parallel workers") + print(f"Total: {total_tests} tests ({len(parallel_tests)} parallel across {len(partitions)} workers" + f"{f', {serial_count} serial' if serial_count else ''})") print(f"Build root: {build_root}") print(f"Port offset step: {args.port_offset_step}") print(f"Sandbox: {args.sandbox}") if args.collect_timings: print("Collecting per-test timing data (tests run sequentially per worker)") - if serial_tests_to_run and not args.no_serial: - print(f"Serial tests will run after parallel tests complete ({len(serial_tests_to_run)} tests)") + print() # Create sandbox base directory sandbox_base = Path(args.sandbox) sandbox_base.mkdir(parents=True, exist_ok=True) + # Progress tracking state + tests_done = 0 + tests_passed = 0 + tests_failed = 0 + tests_skipped = 0 + workers_done = 0 + total_workers = len(partitions) + + def format_eta(elapsed: float, done: int, total: int) -> str: + """Estimate remaining time based on progress so far.""" + if done == 0 or done >= total: + return "--:--" + rate = elapsed / done + remaining = rate * (total - done) + mins, secs = divmod(int(remaining), 60) + if mins >= 60: + hours, mins = divmod(mins, 60) + return f"{hours}h {mins:02d}m" + return f"{mins}m {secs:02d}s" + + def print_progress(phase: str = "Parallel"): + """Print an in-place progress line using \\r.""" + elapsed = time.time() - start_time + elapsed_str = f"{int(elapsed)}s" + eta = format_eta(elapsed, tests_done, total_tests) + pct = (tests_done * 100 // total_tests) if total_tests > 0 else 0 + fail_str = f" | {tests_failed} FAILED" if tests_failed > 0 else "" + skip_str = f" | {tests_skipped} skipped" if tests_skipped > 0 else "" + line = (f"\r[{phase}] {tests_done}/{total_tests} tests ({pct}%) " + f"| {workers_done}/{total_workers} workers done" + f"{fail_str}{skip_str}" + f" | {elapsed_str} elapsed | ETA: {eta} ") + print(line, end='', flush=True) + # Run workers in parallel start_time = time.time() results: List[TestResult] = [] if partitions: + print_progress() with ProcessPoolExecutor(max_workers=len(partitions)) as executor: futures = {} for worker_id, worker_tests in enumerate(partitions): @@ -851,24 +891,43 @@ def main(): try: result = future.result() results.append(result) - status = "PASS" if result.failed == 0 else "FAIL" - ts = datetime.now().strftime("%H:%M:%S") - parts = [f"{result.passed} passed", f"{result.failed} failed"] - if result.skipped > 0: - parts.append(f"{result.skipped} skipped") - print(f"[{ts}] Worker:{worker_id:2d} Done: {', '.join(parts)} ({result.duration:.1f}s) [{status}]") + workers_done += 1 + tests_done += result.passed + result.failed + result.skipped + tests_passed += result.passed + tests_failed += result.failed + tests_skipped += result.skipped + + if args.verbose: + # In verbose mode, print detail line then progress + status = "PASS" if result.failed == 0 else "FAIL" + ts = datetime.now().strftime("%H:%M:%S") + parts = [f"{result.passed} passed", f"{result.failed} failed"] + if result.skipped > 0: + parts.append(f"{result.skipped} skipped") + # Clear the progress line, print detail, then re-print progress + print(f"\r[{ts}] Worker:{worker_id:2d} Done: {', '.join(parts)} " + f"({result.duration:.1f}s) [{status}]" + " " * 20) + + print_progress() except Exception as e: - print(f"[Worker {worker_id}] Error: {e}", file=sys.stderr) + print(f"\r[Worker {worker_id}] Error: {e}" + " " * 20, file=sys.stderr) + workers_done += 1 + tests_done += len(partitions[worker_id]) + tests_failed += len(partitions[worker_id]) results.append(TestResult( worker_id=worker_id, tests=partitions[worker_id], failed=len(partitions[worker_id]), output=str(e) )) + print_progress() + + # Clear the progress line after parallel phase + print() # Run serial tests after parallel tests complete if serial_tests_to_run and not args.no_serial: - print(f"\n{'=' * 70}") + print(f"{'=' * 70}") print("RUNNING SERIAL TESTS") print(f"{'=' * 70}") serial_start = time.time() @@ -891,20 +950,27 @@ def main(): if status == "PASS": serial_result.passed += 1 + tests_passed += 1 elif status == "SKIP": serial_result.skipped += 1 + tests_skipped += 1 else: serial_result.failed += 1 serial_result.failed_tests.append(test_name) + tests_failed += 1 + tests_done += 1 - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - print(f"{timestamp} {status:4s} {duration:6.1f}s Serial {idx:2d}/{len(serial_tests_to_run):2d} {test}", flush=True) + if args.verbose: + timestamp = datetime.now().strftime("%H:%M:%S") + print(f"\r[{timestamp}] {status:4s} {duration:6.1f}s Serial {idx:2d}/{len(serial_tests_to_run):2d} {test}" + " " * 20) + + print_progress(phase="Serial") serial_result.duration = time.time() - serial_start results.append(serial_result) - print(f"\n[Serial] Completed: {serial_result.passed} passed, " - f"{serial_result.failed} failed ({serial_result.duration:.1f}s)") + # Clear the progress line after serial phase + print() total_duration = time.time() - start_time From a7ebae6de7ed74ca56358778492af82560c2785b Mon Sep 17 00:00:00 2001 From: Bryan Call Date: Sun, 8 Feb 2026 08:10:05 -0800 Subject: [PATCH 06/11] Fix test counting and update serial tests list - Progress line and summary now count top-level tests, not autest sub-test results. Previously thread_config (12 sub-tests) inflated the count, causing progress to exceed 100%. - Deduplicate failed test names in summary output - Remove tls_sni_with_port, redirect_to_same_origin_on_cache, and parent-retry from serial list -- all use dynamic ports via get_port() and run fine in parallel - Add thread_config to serial list -- spins up 12 ATS instances and fails under parallel load due to resource contention --- tests/autest-parallel.py | 22 ++++++++++++++-------- tests/serial_tests.txt | 14 ++++---------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/tests/autest-parallel.py b/tests/autest-parallel.py index 80bb0948007..4b6d5e6f673 100755 --- a/tests/autest-parallel.py +++ b/tests/autest-parallel.py @@ -556,11 +556,17 @@ def print_summary(results: List[TestResult], total_duration: float, total_warnings = sum(r.warnings for r in results) total_exceptions = sum(r.exceptions for r in results) total_unknown = sum(r.unknown for r in results) - total_tests = total_passed + total_failed + total_skipped + total_warnings + total_exceptions + total_unknown + # Use actual test count (top-level tests), not sub-test counts from autest + total_tests = sum(len(r.tests) for r in results) + # Deduplicate failed test names (a test may appear in sub-test output multiple times) + seen = set() all_failed_tests = [] for r in results: - all_failed_tests.extend(r.failed_tests) + for t in r.failed_tests: + if t not in seen: + seen.add(t) + all_failed_tests.append(t) # Collect actual timings from results actual_timings = {} @@ -827,9 +833,8 @@ def main(): sandbox_base = Path(args.sandbox) sandbox_base.mkdir(parents=True, exist_ok=True) - # Progress tracking state + # Progress tracking state (counts top-level tests, not autest sub-tests) tests_done = 0 - tests_passed = 0 tests_failed = 0 tests_skipped = 0 workers_done = 0 @@ -892,9 +897,11 @@ def print_progress(phase: str = "Parallel"): result = future.result() results.append(result) workers_done += 1 - tests_done += result.passed + result.failed + result.skipped - tests_passed += result.passed - tests_failed += result.failed + # Use actual test count (top-level), not autest sub-test counts + tests_done += len(result.tests) + # Count top-level tests that failed (from failed_tests list) + tests_failed += len(result.failed_tests) + # Skipped is still useful from autest counts for visibility tests_skipped += result.skipped if args.verbose: @@ -950,7 +957,6 @@ def print_progress(phase: str = "Parallel"): if status == "PASS": serial_result.passed += 1 - tests_passed += 1 elif status == "SKIP": serial_result.skipped += 1 tests_skipped += 1 diff --git a/tests/serial_tests.txt b/tests/serial_tests.txt index 43675a3b9d2..d6eff1d2949 100644 --- a/tests/serial_tests.txt +++ b/tests/serial_tests.txt @@ -1,14 +1,8 @@ -# Tests that must run serially (not in parallel) due to hardcoded ports or -# other constraints that prevent parallel execution. +# Tests that must run serially (not in parallel) due to resource conflicts +# or other constraints that prevent parallel execution. # # Format: One test file path per line, relative to tests/gold_tests/ # Lines starting with # are comments and ignored. -# Tests that intentionally use select_ports=False for specific port testing -tls/tls_sni_with_port.test.py - -# Tests using select_ports=False without dynamic ports -redirect/redirect_to_same_origin_on_cache.test.py - -# Tests with hardcoded ports that are difficult to make dynamic -parent_proxy/parent-retry.test.py +# Spins up 12 ATS instances with varying thread configs; fails under parallel load +thread_config/thread_config.test.py From 63f6bc8bffca04514a26e6b28ef2825365d481a4 Mon Sep 17 00:00:00 2001 From: Bryan Call Date: Sun, 8 Feb 2026 08:28:48 -0800 Subject: [PATCH 07/11] docs: Add parallel test runner section to tests/README.md Document usage of autest-parallel.py including key options, timing-based load balancing, and how to add serial tests. --- tests/README.md | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/tests/README.md b/tests/README.md index 5a8ae9dc4ad..57127a45a46 100644 --- a/tests/README.md +++ b/tests/README.md @@ -40,6 +40,41 @@ The corresponding `autest.sh` command is: $ ./autest.sh --filter=something_descriptive +# Running tests in parallel + +For faster test execution, a parallel test runner is available that distributes +tests across multiple workers. This is especially useful on machines with many +CPU cores. + + $ python3 autest-parallel.py -j 16 --ats-bin /bin --build-root --sandbox /tmp/autest-parallel + +Key options: + +* `-j N` - Number of parallel workers (default: number of CPU cores) +* `--ats-bin` - Path to the ATS install bin directory +* `--build-root` - Path to the build directory (for test plugins) +* `--sandbox` - Directory for test sandboxes (default: `/tmp/autest-parallel`) +* `-v` - Verbose output with real-time test progress per worker +* `--collect-timings` - Run tests individually to collect per-test timing data +* `--list` - List all tests and exit (useful for checking test discovery) + +The parallel runner uses port offsets to ensure each worker gets a unique port +range, preventing conflicts between concurrent test instances. Tests known to +require serial execution (listed in `serial_tests.txt`) are run sequentially +after the parallel phase completes. + +## Timing-based load balancing + +If a `test-timings.json` file exists (generated by a previous run with +`--collect-timings`), the runner uses the Longest Processing Time (LPT) +algorithm to distribute tests across workers for balanced execution times. +Without timing data, tests are distributed round-robin. + +## Adding serial tests + +If a test cannot run in parallel (e.g., it uses hardcoded global resources), +add its path relative to `gold_tests/` to `serial_tests.txt`. + # Advanced setup AuTest and the relevant tools can be install manually instead of using the wrapper script. By doing this, it is often easier to debug issues with the testing system, or the tests. There are two ways this can be done. From c9ea3bd5e4f801f29a320a5b091b528e5c8b7c40 Mon Sep 17 00:00:00 2001 From: Bryan Call Date: Sun, 8 Feb 2026 08:36:01 -0800 Subject: [PATCH 08/11] Format autest-parallel.py with yapf Apply project yapf formatting rules to pass CI format check. --- tests/autest-parallel.py | 211 ++++++++++++++------------------------- 1 file changed, 75 insertions(+), 136 deletions(-) diff --git a/tests/autest-parallel.py b/tests/autest-parallel.py index 4b6d5e6f673..d98068bc339 100755 --- a/tests/autest-parallel.py +++ b/tests/autest-parallel.py @@ -170,11 +170,7 @@ def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]: return [p for p in partitions if p] # Remove empty partitions -def partition_tests_by_time( - tests: List[str], - num_jobs: int, - timings: Dict[str, float] -) -> Tuple[List[List[str]], List[float]]: +def partition_tests_by_time(tests: List[str], num_jobs: int, timings: Dict[str, float]) -> Tuple[List[List[str]], List[float]]: """ Partition tests using LPT (Longest Processing Time first) algorithm. This balances the load across workers based on expected test duration. @@ -332,15 +328,8 @@ def parse_autest_output(output: str) -> dict: return result -def run_single_test( - test: str, - script_dir: Path, - sandbox: Path, - ats_bin: str, - build_root: str, - extra_args: List[str], - env: dict -) -> Tuple[str, float, str, str]: +def run_single_test(test: str, script_dir: Path, sandbox: Path, ats_bin: str, build_root: str, extra_args: List[str], + env: dict) -> Tuple[str, float, str, str]: """ Run a single test and return its timing. @@ -349,12 +338,8 @@ def run_single_test( status is one of: "PASS", "FAIL", "SKIP" """ cmd = [ - 'uv', 'run', 'autest', 'run', - '--directory', 'gold_tests', - '--ats-bin', ats_bin, - '--build-root', build_root, - '--sandbox', str(sandbox / test), - '--filters', test + 'uv', 'run', 'autest', 'run', '--directory', 'gold_tests', '--ats-bin', ats_bin, '--build-root', build_root, '--sandbox', + str(sandbox / test), '--filters', test ] cmd.extend(extra_args) @@ -377,8 +362,8 @@ def run_single_test( # - FAIL: test failed, had exceptions, or nothing ran at all if parsed['skipped'] > 0 and parsed['passed'] == 0 and parsed['failed'] == 0: status = "SKIP" - elif (parsed['failed'] == 0 and parsed['exceptions'] == 0 - and proc.returncode == 0 and (parsed['passed'] > 0 or parsed['skipped'] > 0)): + elif (parsed['failed'] == 0 and parsed['exceptions'] == 0 and proc.returncode == 0 and + (parsed['passed'] > 0 or parsed['skipped'] > 0)): status = "PASS" else: status = "FAIL" @@ -390,17 +375,16 @@ def run_single_test( def run_worker( - worker_id: int, - tests: List[str], - script_dir: Path, - sandbox_base: Path, - ats_bin: str, - build_root: str, - extra_args: List[str], - port_offset_step: int = 1000, - verbose: bool = False, - collect_timings: bool = False -) -> TestResult: + worker_id: int, + tests: List[str], + script_dir: Path, + sandbox_base: Path, + ats_bin: str, + build_root: str, + extra_args: List[str], + port_offset_step: int = 1000, + verbose: bool = False, + collect_timings: bool = False) -> TestResult: """ Run autest on a subset of tests with isolated sandbox and port range. @@ -438,9 +422,7 @@ def run_worker( all_output = [] total_tests = len(tests) for idx, test in enumerate(tests, 1): - test_name, duration, status, output = run_single_test( - test, script_dir, sandbox, ats_bin, build_root, extra_args, env - ) + test_name, duration, status, output = run_single_test(test, script_dir, sandbox, ats_bin, build_root, extra_args, env) result.test_timings[test_name] = duration all_output.append(output) @@ -461,11 +443,18 @@ def run_worker( else: # Run all tests in batch (faster but no per-test timing) cmd = [ - 'uv', 'run', 'autest', 'run', - '--directory', 'gold_tests', - '--ats-bin', ats_bin, - '--build-root', build_root, - '--sandbox', str(sandbox), + 'uv', + 'run', + 'autest', + 'run', + '--directory', + 'gold_tests', + '--ats-bin', + ats_bin, + '--build-root', + build_root, + '--sandbox', + str(sandbox), ] # Add test filters @@ -478,8 +467,10 @@ def run_worker( timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"{timestamp} Worker:{worker_id:2d} Starting batch of {len(tests)} tests (port offset {port_offset})", flush=True) if verbose: - print(f" Worker:{worker_id:2d} Tests: {', '.join(tests[:5])}" - f"{'...' if len(tests) > 5 else ''}", flush=True) + print( + f" Worker:{worker_id:2d} Tests: {', '.join(tests[:5])}" + f"{'...' if len(tests) > 5 else ''}", + flush=True) try: if verbose: @@ -547,8 +538,7 @@ def run_worker( return result -def print_summary(results: List[TestResult], total_duration: float, - expected_timings: Optional[Dict[str, float]] = None): +def print_summary(results: List[TestResult], total_duration: float, expected_timings: Optional[Dict[str, float]] = None): """Print aggregated results from all workers.""" total_passed = sum(r.passed for r in results) total_failed = sum(r.failed for r in results) @@ -623,13 +613,15 @@ def print_summary(results: List[TestResult], total_duration: float, worker_label = " Serial: " else: worker_label = f" Worker:{r.worker_id:2d}" - print(f"{worker_label} {r.passed:3d} passed, {r.failed:3d} failed, " - f"{r.skipped:3d} skipped ({r.duration:6.1f}s) [{status}]") + print( + f"{worker_label} {r.passed:3d} passed, {r.failed:3d} failed, " + f"{r.skipped:3d} skipped ({r.duration:6.1f}s) [{status}]") # Total summary line print("-" * 70) - print(f" TOTAL: {total_passed:3d} passed, {total_failed:3d} failed, " - f"{total_skipped:3d} skipped ({total_duration:6.1f}s)") + print( + f" TOTAL: {total_passed:3d} passed, {total_failed:3d} failed, " + f"{total_skipped:3d} skipped ({total_duration:6.1f}s)") def main(): @@ -652,90 +644,39 @@ def main(): # Use saved timing data for load-balanced partitioning %(prog)s -j 16 --timings-file test-timings.json --ats-bin /opt/ats/bin --sandbox /tmp/autest -''' - ) +''') parser.add_argument( - '-j', '--jobs', - type=int, - default=os.cpu_count() or 4, - help='Number of parallel workers (default: CPU count)' - ) - parser.add_argument( - '--ats-bin', - required=True, - help='Path to ATS bin directory' - ) + '-j', '--jobs', type=int, default=os.cpu_count() or 4, help='Number of parallel workers (default: CPU count)') + parser.add_argument('--ats-bin', required=True, help='Path to ATS bin directory') parser.add_argument( '--build-root', default=None, help='Path to the build directory (for test plugins, etc.). ' - 'Defaults to the source tree root.' - ) - parser.add_argument( - '--sandbox', - default='/tmp/autest-parallel', - help='Base sandbox directory (default: /tmp/autest-parallel)' - ) - parser.add_argument( - '-f', '--filter', - action='append', - dest='filters', - help='Filter tests by glob pattern (can be specified multiple times)' - ) - parser.add_argument( - '--list', - action='store_true', - help='List tests without running' - ) + 'Defaults to the source tree root.') + parser.add_argument('--sandbox', default='/tmp/autest-parallel', help='Base sandbox directory (default: /tmp/autest-parallel)') parser.add_argument( - '--port-offset-step', - type=int, - default=1000, - help='Port offset between workers (default: 1000)' - ) + '-f', '--filter', action='append', dest='filters', help='Filter tests by glob pattern (can be specified multiple times)') + parser.add_argument('--list', action='store_true', help='List tests without running') + parser.add_argument('--port-offset-step', type=int, default=1000, help='Port offset between workers (default: 1000)') + parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output') + parser.add_argument('--test-dir', default='gold_tests', help='Test directory relative to script location (default: gold_tests)') parser.add_argument( - '-v', '--verbose', - action='store_true', - help='Verbose output' - ) - parser.add_argument( - '--test-dir', - default='gold_tests', - help='Test directory relative to script location (default: gold_tests)' - ) - parser.add_argument( - '--collect-timings', - action='store_true', - help='Run tests one at a time to collect accurate per-test timing data' - ) + '--collect-timings', action='store_true', help='Run tests one at a time to collect accurate per-test timing data') parser.add_argument( '--timings-file', type=Path, default=DEFAULT_TIMING_FILE, - help=f'Path to timing data JSON file (default: {DEFAULT_TIMING_FILE})' - ) + help=f'Path to timing data JSON file (default: {DEFAULT_TIMING_FILE})') parser.add_argument( - '--no-timing', - action='store_true', - help='Disable timing-based load balancing (use round-robin partitioning)' - ) + '--no-timing', action='store_true', help='Disable timing-based load balancing (use round-robin partitioning)') parser.add_argument( '--serial-tests-file', type=Path, default=DEFAULT_SERIAL_TESTS_FILE, - help=f'Path to file listing tests that must run serially (default: {DEFAULT_SERIAL_TESTS_FILE})' - ) - parser.add_argument( - '--no-serial', - action='store_true', - help='Skip serial tests entirely' - ) - parser.add_argument( - 'extra_args', - nargs='*', - help='Additional arguments to pass to autest' - ) + help=f'Path to file listing tests that must run serially (default: {DEFAULT_SERIAL_TESTS_FILE})') + parser.add_argument('--no-serial', action='store_true', help='Skip serial tests entirely') + parser.add_argument('extra_args', nargs='*', help='Additional arguments to pass to autest') args = parser.parse_args() @@ -820,8 +761,9 @@ def main(): if partitions: print(f"Running with {len(partitions)} parallel workers") - print(f"Total: {total_tests} tests ({len(parallel_tests)} parallel across {len(partitions)} workers" - f"{f', {serial_count} serial' if serial_count else ''})") + print( + f"Total: {total_tests} tests ({len(parallel_tests)} parallel across {len(partitions)} workers" + f"{f', {serial_count} serial' if serial_count else ''})") print(f"Build root: {build_root}") print(f"Port offset step: {args.port_offset_step}") print(f"Sandbox: {args.sandbox}") @@ -860,10 +802,11 @@ def print_progress(phase: str = "Parallel"): pct = (tests_done * 100 // total_tests) if total_tests > 0 else 0 fail_str = f" | {tests_failed} FAILED" if tests_failed > 0 else "" skip_str = f" | {tests_skipped} skipped" if tests_skipped > 0 else "" - line = (f"\r[{phase}] {tests_done}/{total_tests} tests ({pct}%) " - f"| {workers_done}/{total_workers} workers done" - f"{fail_str}{skip_str}" - f" | {elapsed_str} elapsed | ETA: {eta} ") + line = ( + f"\r[{phase}] {tests_done}/{total_tests} tests ({pct}%) " + f"| {workers_done}/{total_workers} workers done" + f"{fail_str}{skip_str}" + f" | {elapsed_str} elapsed | ETA: {eta} ") print(line, end='', flush=True) # Run workers in parallel @@ -886,8 +829,7 @@ def print_progress(phase: str = "Parallel"): extra_args=args.extra_args or [], port_offset_step=args.port_offset_step, verbose=args.verbose, - collect_timings=args.collect_timings - ) + collect_timings=args.collect_timings) futures[future] = worker_id # Collect results as they complete @@ -912,8 +854,9 @@ def print_progress(phase: str = "Parallel"): if result.skipped > 0: parts.append(f"{result.skipped} skipped") # Clear the progress line, print detail, then re-print progress - print(f"\r[{ts}] Worker:{worker_id:2d} Done: {', '.join(parts)} " - f"({result.duration:.1f}s) [{status}]" + " " * 20) + print( + f"\r[{ts}] Worker:{worker_id:2d} Done: {', '.join(parts)} " + f"({result.duration:.1f}s) [{status}]" + " " * 20) print_progress() except Exception as e: @@ -921,12 +864,9 @@ def print_progress(phase: str = "Parallel"): workers_done += 1 tests_done += len(partitions[worker_id]) tests_failed += len(partitions[worker_id]) - results.append(TestResult( - worker_id=worker_id, - tests=partitions[worker_id], - failed=len(partitions[worker_id]), - output=str(e) - )) + results.append( + TestResult( + worker_id=worker_id, tests=partitions[worker_id], failed=len(partitions[worker_id]), output=str(e))) print_progress() # Clear the progress line after parallel phase @@ -950,9 +890,7 @@ def print_progress(phase: str = "Parallel"): for idx, test in enumerate(serial_tests_to_run, 1): test_name, duration, status, output = run_single_test( - test, script_dir, sandbox_base / "serial", args.ats_bin, - build_root, args.extra_args or [], env - ) + test, script_dir, sandbox_base / "serial", args.ats_bin, build_root, args.extra_args or [], env) serial_result.test_timings[test_name] = duration if status == "PASS": @@ -968,7 +906,8 @@ def print_progress(phase: str = "Parallel"): if args.verbose: timestamp = datetime.now().strftime("%H:%M:%S") - print(f"\r[{timestamp}] {status:4s} {duration:6.1f}s Serial {idx:2d}/{len(serial_tests_to_run):2d} {test}" + " " * 20) + print( + f"\r[{timestamp}] {status:4s} {duration:6.1f}s Serial {idx:2d}/{len(serial_tests_to_run):2d} {test}" + " " * 20) print_progress(phase="Serial") From 3e28d16350805ff97a1d95328071d5e5722c4294 Mon Sep 17 00:00:00 2001 From: Bryan Call Date: Sun, 8 Feb 2026 09:17:45 -0800 Subject: [PATCH 09/11] Address review feedback: validation, cleanup, and timeout handling - ports.py: Add try/except for AUTEST_PORT_OFFSET parsing with range clamping - Make --ats-bin conditionally required (not needed for --list mode) - Add timeout handling for verbose mode subprocess.Popen.wait() - Update load_serial_tests docstring to match actual basename behavior - Remove unused current_test and test_start_line variables - Add explanatory comments to exception handlers --- tests/autest-parallel.py | 37 +++++++++++++++------------ tests/gold_tests/autest-site/ports.py | 8 +++++- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/tests/autest-parallel.py b/tests/autest-parallel.py index d98068bc339..99c6da3812b 100755 --- a/tests/autest-parallel.py +++ b/tests/autest-parallel.py @@ -98,12 +98,12 @@ def load_serial_tests(serial_file: Path) -> set: Load list of tests that must run serially from a file. The file format is one test name per line, with # for comments. - Test names can be: - - Simple names: test_name (matches any test containing this) - - Full paths: subdir/test_name.test.py + Test names can be full paths like ``subdir/test_name.test.py``. + The .test.py extension is stripped, and only the basename (stem) is + used for matching against discovered test names. Returns: - Set of test names that must run serially + Set of test base names that must run serially """ serial_tests = set() if not serial_file.exists(): @@ -123,7 +123,7 @@ def load_serial_tests(serial_file: Path) -> set: test_name = Path(line).stem.replace('.test', '') serial_tests.add(test_name) except IOError: - pass + pass # File is optional; missing file means no serial tests return serial_tests @@ -135,7 +135,7 @@ def load_timings(timing_file: Path) -> Dict[str, float]: with open(timing_file) as f: return json.load(f) except (json.JSONDecodeError, IOError): - pass + pass # Timing data is optional; fall back to equal partitioning return {} @@ -253,14 +253,6 @@ def parse_autest_output(output: str) -> dict: clean_output = strip_ansi(output) lines = clean_output.split('\n') - # Track test start times to calculate duration - # Autest output format: - # Running Test: test_name - # ... test output ... - # Test: test_name: Passed/Failed - current_test = None - test_start_line = None - # First pass: find test results and their line positions test_results = [] # (line_num, test_name, result) for i, line in enumerate(lines): @@ -474,7 +466,9 @@ def run_worker( try: if verbose: - # Stream output in real-time so the user sees test progress + # Stream output in real-time so the user sees test progress. + # We use Popen + line-by-line read so partial results are visible + # even if the overall run takes a long time. proc = subprocess.Popen( cmd, cwd=script_dir, @@ -492,7 +486,12 @@ def run_worker( if clean.startswith('Running Test'): ts = datetime.now().strftime("%H:%M:%S") print(f" [{ts}] Worker:{worker_id:2d} {clean}", flush=True) - proc.wait(timeout=3600) + # stdout is exhausted, wait for process to finish + try: + proc.wait(timeout=60) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() result.output = ''.join(output_lines) result.return_code = proc.returncode else: @@ -648,7 +647,7 @@ def main(): parser.add_argument( '-j', '--jobs', type=int, default=os.cpu_count() or 4, help='Number of parallel workers (default: CPU count)') - parser.add_argument('--ats-bin', required=True, help='Path to ATS bin directory') + parser.add_argument('--ats-bin', default=None, help='Path to ATS bin directory (required unless --list is used)') parser.add_argument( '--build-root', default=None, @@ -680,6 +679,10 @@ def main(): args = parser.parse_args() + # --ats-bin is required unless --list is used + if not args.list and not args.ats_bin: + parser.error("--ats-bin is required when running tests (not needed for --list)") + # Determine paths script_dir = Path(__file__).parent.resolve() test_dir = script_dir / args.test_dir diff --git a/tests/gold_tests/autest-site/ports.py b/tests/gold_tests/autest-site/ports.py index adb28cc3f24..fc68367fc15 100644 --- a/tests/gold_tests/autest-site/ports.py +++ b/tests/gold_tests/autest-site/ports.py @@ -161,7 +161,13 @@ def _setup_port_queue(amount=1000): return # Get port offset for parallel execution support - port_offset = int(os.environ.get('AUTEST_PORT_OFFSET', 0)) + try: + port_offset = int(os.environ.get('AUTEST_PORT_OFFSET', 0)) + except ValueError: + host.WriteWarning("AUTEST_PORT_OFFSET is not a valid integer, defaulting to 0") + port_offset = 0 + # Clamp to a safe range to avoid exceeding the valid port space + port_offset = max(0, min(port_offset, 60000)) if port_offset > 0: host.WriteVerbose('_setup_port_queue', f"Using port offset: {port_offset}") From f2792fa6004dd5a039c2e0ddf7cc8295d3e3cd41 Mon Sep 17 00:00:00 2001 From: Brian Neradt Date: Tue, 10 Feb 2026 19:51:07 +0000 Subject: [PATCH 10/11] Fix parallel autest runner for build-tree execution The parallel runner from PR #12867 assumed it ran from the source tree, but the project convention is to run tests from the build directory where uv, pyproject.toml, and compiled test plugins reside. This commit converts autest-parallel.py to a CMake .in template so configure_file substitutes the correct absolute paths for gold_tests, proxy-verifier, serial_tests.txt, and PYTHONPATH. It also adds --proxy-verifier-bin and PYTHONPATH setup that the original script was missing (both are required by the autest test harness), and integrates the runner into autest.sh so that passing -j N delegates to autest-parallel.py while the default sequential path is unchanged. In addition, several resource management and correctness issues identified during code review are addressed: The standalone tests/autest-parallel.py was a stale copy of the CMake template. Since CMake already generates it in the build tree from the .py.in template, the source-tree copy is removed. In autest.sh.in, the CURL_UDS_FLAG block (which excludes h2, tls, and tls_hooks tests when curl lacks Unix Domain Socket support) ran after the parallel dispatch and used "exec", so it never executed for parallel runs. The block is moved before the dispatch, "exec" is replaced with a regular invocation so the cleanup code always runs, and CURL_UDS_FLAG and AUTEST_OPTIONS are forwarded to the parallel runner. In the parallel runner itself: the verbose-mode Popen path now has a try/finally to prevent child process leaks on exceptions; both the collect_timings and batch code paths catch KeyboardInterrupt and return clean results with exit code 130; the main function wraps the entire execution phase in try/except KeyboardInterrupt to collect partial results and exit cleanly on Ctrl+C; SIGTERM is translated to KeyboardInterrupt via a signal handler for consistent cleanup. The default test-timings.json location is moved from the source tree to the sandbox directory to avoid polluting the repository. The -f/--filter argument is changed from action='append' to nargs='+' with action='extend' so that "-f test1 test2 test3" works, matching sequential autest behavior and CI pipeline usage. --- doc/developer-guide/testing/autests.en.rst | 9 + tests/CMakeLists.txt | 1 + ...test-parallel.py => autest-parallel.py.in} | 362 +++++++++++------- tests/autest.sh.in | 49 ++- 4 files changed, 263 insertions(+), 158 deletions(-) rename tests/{autest-parallel.py => autest-parallel.py.in} (74%) diff --git a/doc/developer-guide/testing/autests.en.rst b/doc/developer-guide/testing/autests.en.rst index 29b0de15576..64cf40fe4f9 100644 --- a/doc/developer-guide/testing/autests.en.rst +++ b/doc/developer-guide/testing/autests.en.rst @@ -73,6 +73,15 @@ For example, to run ``cache-auth.test.py``: ./autest.sh --sandbox /tmp/sbcursor --clean=none -f cache-auth +To run tests in parallel, pass ``-j N`` where ``N`` is the number of worker +processes. Each worker gets an isolated port range to avoid conflicts: + +.. code-block:: bash + + ./autest.sh -j 10 --sandbox /tmp/sbcursor -f cache-auth -f cache-control + +Without ``-j``, tests run sequentially. + Recommended Approach: ATSReplayTest ==================================== diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 95b6a0f331d..d61f530ab37 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -57,6 +57,7 @@ endif() configure_file(pyproject.toml pyproject.toml COPYONLY) configure_file(autest.sh.in autest.sh) +configure_file(autest-parallel.py.in autest-parallel.py) add_custom_target( autest diff --git a/tests/autest-parallel.py b/tests/autest-parallel.py.in similarity index 74% rename from tests/autest-parallel.py rename to tests/autest-parallel.py.in index 99c6da3812b..a79fd080327 100755 --- a/tests/autest-parallel.py +++ b/tests/autest-parallel.py.in @@ -31,6 +31,7 @@ import json import os import re +import signal import subprocess import sys import time @@ -40,10 +41,8 @@ from pathlib import Path from typing import Dict, List, Optional, Tuple -# Default timing file location -DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json" # Default serial tests file location -DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt" +DEFAULT_SERIAL_TESTS_FILE = Path("${CMAKE_CURRENT_SOURCE_DIR}") / "serial_tests.txt" # Default estimate for unknown tests (seconds) DEFAULT_TEST_TIME = 15.0 @@ -330,7 +329,8 @@ def run_single_test(test: str, script_dir: Path, sandbox: Path, ats_bin: str, bu status is one of: "PASS", "FAIL", "SKIP" """ cmd = [ - 'uv', 'run', 'autest', 'run', '--directory', 'gold_tests', '--ats-bin', ats_bin, '--build-root', build_root, '--sandbox', + 'uv', 'run', 'autest', 'run', '--directory', '${CMAKE_GOLD_DIR}', '--ats-bin', ats_bin, '--proxy-verifier-bin', + '${PROXY_VERIFIER_PATH}', '--build-root', build_root, '--sandbox', str(sandbox / test), '--filters', test ] cmd.extend(extra_args) @@ -405,33 +405,45 @@ def run_worker( # Calculate port offset for this worker port_offset = worker_id * port_offset_step - # Set up environment with port offset + # Set up environment with port offset and PYTHONPATH for test extensions. env = os.environ.copy() env['AUTEST_PORT_OFFSET'] = str(port_offset) + pythonpath_dirs = [ + '${CMAKE_CURRENT_SOURCE_DIR}/gold_tests/remap', + '${CMAKE_CURRENT_SOURCE_DIR}/gold_tests/lib', + ] + existing = env.get('PYTHONPATH', '') + env['PYTHONPATH'] = ':'.join(pythonpath_dirs + ([existing] if existing else [])) if collect_timings: # Run tests one at a time to collect accurate timing all_output = [] total_tests = len(tests) - for idx, test in enumerate(tests, 1): - test_name, duration, status, output = run_single_test(test, script_dir, sandbox, ats_bin, build_root, extra_args, env) - result.test_timings[test_name] = duration - all_output.append(output) - - if status == "PASS": - result.passed += 1 - elif status == "SKIP": - result.skipped += 1 - else: - result.failed += 1 - result.failed_tests.append(test_name) - - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - # Fixed-width format: date time status duration worker progress test_name - print(f"{timestamp} {status:4s} {duration:6.1f}s Worker:{worker_id:2d} {idx:2d}/{total_tests:2d} {test}", flush=True) + try: + for idx, test in enumerate(tests, 1): + test_name, duration, status, output = run_single_test( + test, script_dir, sandbox, ats_bin, build_root, extra_args, env) + result.test_timings[test_name] = duration + all_output.append(output) + + if status == "PASS": + result.passed += 1 + elif status == "SKIP": + result.skipped += 1 + else: + result.failed += 1 + result.failed_tests.append(test_name) + + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + # Fixed-width format: date time status duration worker progress test_name + print( + f"{timestamp} {status:4s} {duration:6.1f}s Worker:{worker_id:2d} {idx:2d}/{total_tests:2d} {test}", flush=True) + except KeyboardInterrupt: + result.return_code = 130 result.output = "\n".join(all_output) - result.return_code = 0 if result.failed == 0 else 1 + if result.return_code != 130: + result.return_code = 0 if result.failed == 0 else 1 else: # Run all tests in batch (faster but no per-test timing) cmd = [ @@ -440,9 +452,11 @@ def run_worker( 'autest', 'run', '--directory', - 'gold_tests', + '${CMAKE_GOLD_DIR}', '--ats-bin', ats_bin, + '--proxy-verifier-bin', + '${PROXY_VERIFIER_PATH}', '--build-root', build_root, '--sandbox', @@ -478,20 +492,30 @@ def run_worker( env=env, ) output_lines = [] - for line in proc.stdout: - output_lines.append(line) - # Print lines that show test progress - clean = strip_ansi(line).strip() - if clean.startswith('Running Test') or 'Passed' in clean or 'Failed' in clean: - if clean.startswith('Running Test'): - ts = datetime.now().strftime("%H:%M:%S") - print(f" [{ts}] Worker:{worker_id:2d} {clean}", flush=True) - # stdout is exhausted, wait for process to finish try: - proc.wait(timeout=60) - except subprocess.TimeoutExpired: - proc.kill() - proc.wait() + for line in proc.stdout: + output_lines.append(line) + # Print lines that show test progress + clean = strip_ansi(line).strip() + if clean.startswith('Running Test') or 'Passed' in clean or 'Failed' in clean: + if clean.startswith('Running Test'): + ts = datetime.now().strftime("%H:%M:%S") + print(f" [{ts}] Worker:{worker_id:2d} {clean}", flush=True) + # stdout is exhausted, wait for process to finish + try: + proc.wait(timeout=60) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + finally: + # Ensure the subprocess is always cleaned up. + if proc.poll() is None: + proc.terminate() + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() result.output = ''.join(output_lines) result.return_code = proc.returncode else: @@ -524,6 +548,10 @@ def run_worker( result.failed = len(tests) result.failed_tests = list(tests) + except KeyboardInterrupt: + result.output = "INTERRUPTED by user" + result.return_code = 130 + result.failed = len(tests) except subprocess.TimeoutExpired: result.output = "TIMEOUT: Worker exceeded 1 hour timeout" result.return_code = -1 @@ -623,7 +651,14 @@ def print_summary(results: List[TestResult], total_duration: float, expected_tim f"{total_skipped:3d} skipped ({total_duration:6.1f}s)") +def _sigterm_handler(signum, frame): + """Translate SIGTERM into KeyboardInterrupt so the same cleanup path runs.""" + raise KeyboardInterrupt + + def main(): + signal.signal(signal.SIGTERM, _sigterm_handler) + parser = argparse.ArgumentParser( description='Run autest tests in parallel', formatter_class=argparse.RawDescriptionHelpFormatter, @@ -655,18 +690,20 @@ def main(): 'Defaults to the source tree root.') parser.add_argument('--sandbox', default='/tmp/autest-parallel', help='Base sandbox directory (default: /tmp/autest-parallel)') parser.add_argument( - '-f', '--filter', action='append', dest='filters', help='Filter tests by glob pattern (can be specified multiple times)') + '-f', + '--filter', + nargs='+', + action='extend', + dest='filters', + help='Filter tests by name or glob pattern (multiple names can follow a single -f)') parser.add_argument('--list', action='store_true', help='List tests without running') parser.add_argument('--port-offset-step', type=int, default=1000, help='Port offset between workers (default: 1000)') parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output') - parser.add_argument('--test-dir', default='gold_tests', help='Test directory relative to script location (default: gold_tests)') + parser.add_argument('--test-dir', default='${CMAKE_GOLD_DIR}', help='Path to gold_tests directory (default: ${CMAKE_GOLD_DIR})') parser.add_argument( '--collect-timings', action='store_true', help='Run tests one at a time to collect accurate per-test timing data') parser.add_argument( - '--timings-file', - type=Path, - default=DEFAULT_TIMING_FILE, - help=f'Path to timing data JSON file (default: {DEFAULT_TIMING_FILE})') + '--timings-file', type=Path, default=None, help='Path to timing data JSON file (default: /test-timings.json)') parser.add_argument( '--no-timing', action='store_true', help='Disable timing-based load balancing (use round-robin partitioning)') parser.add_argument( @@ -675,9 +712,12 @@ def main(): default=DEFAULT_SERIAL_TESTS_FILE, help=f'Path to file listing tests that must run serially (default: {DEFAULT_SERIAL_TESTS_FILE})') parser.add_argument('--no-serial', action='store_true', help='Skip serial tests entirely') - parser.add_argument('extra_args', nargs='*', help='Additional arguments to pass to autest') + args, unknown_args = parser.parse_known_args() + args.extra_args = unknown_args - args = parser.parse_args() + # Default timing file to sandbox to avoid writing into the source tree. + if args.timings_file is None: + args.timings_file = Path(args.sandbox) / "test-timings.json" # --ats-bin is required unless --list is used if not args.list and not args.ats_bin: @@ -815,110 +855,137 @@ def print_progress(phase: str = "Parallel"): # Run workers in parallel start_time = time.time() results: List[TestResult] = [] + interrupted = False + futures = {} + serial_result = None + serial_start = None - if partitions: - print_progress() - with ProcessPoolExecutor(max_workers=len(partitions)) as executor: - futures = {} - for worker_id, worker_tests in enumerate(partitions): - future = executor.submit( - run_worker, - worker_id=worker_id, - tests=worker_tests, - script_dir=script_dir, - sandbox_base=sandbox_base, - ats_bin=args.ats_bin, - build_root=build_root, - extra_args=args.extra_args or [], - port_offset_step=args.port_offset_step, - verbose=args.verbose, - collect_timings=args.collect_timings) - futures[future] = worker_id - - # Collect results as they complete - for future in as_completed(futures): - worker_id = futures[future] + try: + if partitions: + print_progress() + with ProcessPoolExecutor(max_workers=len(partitions)) as executor: + for worker_id, worker_tests in enumerate(partitions): + future = executor.submit( + run_worker, + worker_id=worker_id, + tests=worker_tests, + script_dir=script_dir, + sandbox_base=sandbox_base, + ats_bin=args.ats_bin, + build_root=build_root, + extra_args=args.extra_args or [], + port_offset_step=args.port_offset_step, + verbose=args.verbose, + collect_timings=args.collect_timings) + futures[future] = worker_id + + # Collect results as they complete + for future in as_completed(futures): + worker_id = futures[future] + try: + result = future.result() + results.append(result) + workers_done += 1 + # Use actual test count (top-level), not autest sub-test counts + tests_done += len(result.tests) + # Count top-level tests that failed (from failed_tests list) + tests_failed += len(result.failed_tests) + # Skipped is still useful from autest counts for visibility + tests_skipped += result.skipped + + if args.verbose: + # In verbose mode, print detail line then progress + status = "PASS" if result.failed == 0 else "FAIL" + ts = datetime.now().strftime("%H:%M:%S") + parts = [f"{result.passed} passed", f"{result.failed} failed"] + if result.skipped > 0: + parts.append(f"{result.skipped} skipped") + # Clear the progress line, print detail, then re-print progress + print( + f"\r[{ts}] Worker:{worker_id:2d} Done: {', '.join(parts)} " + f"({result.duration:.1f}s) [{status}]" + " " * 20) + + print_progress() + except Exception as e: + print(f"\r[Worker {worker_id}] Error: {e}" + " " * 20, file=sys.stderr) + workers_done += 1 + tests_done += len(partitions[worker_id]) + tests_failed += len(partitions[worker_id]) + results.append( + TestResult( + worker_id=worker_id, tests=partitions[worker_id], failed=len(partitions[worker_id]), output=str(e))) + print_progress() + + # Clear the progress line after parallel phase + print() + + # Run serial tests after parallel tests complete + if serial_tests_to_run and not args.no_serial: + print(f"{'=' * 70}") + print("RUNNING SERIAL TESTS") + print(f"{'=' * 70}") + serial_start = time.time() + + # Use a special worker ID for serial tests (after parallel workers) + serial_worker_id = len(partitions) if partitions else 0 + + # Set up environment without port offset (serial tests run alone) + env = os.environ.copy() + env['AUTEST_PORT_OFFSET'] = '0' + + serial_result = TestResult(worker_id=serial_worker_id, tests=serial_tests_to_run, is_serial=True) + + for idx, test in enumerate(serial_tests_to_run, 1): + test_name, duration, status, output = run_single_test( + test, script_dir, sandbox_base / "serial", args.ats_bin, build_root, args.extra_args or [], env) + serial_result.test_timings[test_name] = duration + + if status == "PASS": + serial_result.passed += 1 + elif status == "SKIP": + serial_result.skipped += 1 + tests_skipped += 1 + else: + serial_result.failed += 1 + serial_result.failed_tests.append(test_name) + tests_failed += 1 + tests_done += 1 + + if args.verbose: + timestamp = datetime.now().strftime("%H:%M:%S") + print( + f"\r[{timestamp}] {status:4s} {duration:6.1f}s Serial {idx:2d}/{len(serial_tests_to_run):2d} {test}" + + " " * 20) + + print_progress(phase="Serial") + + serial_result.duration = time.time() - serial_start + results.append(serial_result) + + # Clear the progress line after serial phase + print() + + except KeyboardInterrupt: + interrupted = True + print("\n\nInterrupted! Collecting completed results...") + # Collect any parallel worker results that completed before the + # interrupt. The ProcessPoolExecutor's shutdown(wait=True) in __exit__ + # will have waited for workers (which also received SIGINT) to finish, + # so completed futures are available here. + seen_workers = {r.worker_id for r in results} + for future in futures: + if future.done() and not future.cancelled(): try: - result = future.result() - results.append(result) - workers_done += 1 - # Use actual test count (top-level), not autest sub-test counts - tests_done += len(result.tests) - # Count top-level tests that failed (from failed_tests list) - tests_failed += len(result.failed_tests) - # Skipped is still useful from autest counts for visibility - tests_skipped += result.skipped - - if args.verbose: - # In verbose mode, print detail line then progress - status = "PASS" if result.failed == 0 else "FAIL" - ts = datetime.now().strftime("%H:%M:%S") - parts = [f"{result.passed} passed", f"{result.failed} failed"] - if result.skipped > 0: - parts.append(f"{result.skipped} skipped") - # Clear the progress line, print detail, then re-print progress - print( - f"\r[{ts}] Worker:{worker_id:2d} Done: {', '.join(parts)} " - f"({result.duration:.1f}s) [{status}]" + " " * 20) - - print_progress() - except Exception as e: - print(f"\r[Worker {worker_id}] Error: {e}" + " " * 20, file=sys.stderr) - workers_done += 1 - tests_done += len(partitions[worker_id]) - tests_failed += len(partitions[worker_id]) - results.append( - TestResult( - worker_id=worker_id, tests=partitions[worker_id], failed=len(partitions[worker_id]), output=str(e))) - print_progress() - - # Clear the progress line after parallel phase - print() - - # Run serial tests after parallel tests complete - if serial_tests_to_run and not args.no_serial: - print(f"{'=' * 70}") - print("RUNNING SERIAL TESTS") - print(f"{'=' * 70}") - serial_start = time.time() - - # Use a special worker ID for serial tests (after parallel workers) - serial_worker_id = len(partitions) if partitions else 0 - - # Set up environment without port offset (serial tests run alone) - env = os.environ.copy() - env['AUTEST_PORT_OFFSET'] = '0' - - serial_result = TestResult(worker_id=serial_worker_id, tests=serial_tests_to_run, is_serial=True) - - for idx, test in enumerate(serial_tests_to_run, 1): - test_name, duration, status, output = run_single_test( - test, script_dir, sandbox_base / "serial", args.ats_bin, build_root, args.extra_args or [], env) - serial_result.test_timings[test_name] = duration - - if status == "PASS": - serial_result.passed += 1 - elif status == "SKIP": - serial_result.skipped += 1 - tests_skipped += 1 - else: - serial_result.failed += 1 - serial_result.failed_tests.append(test_name) - tests_failed += 1 - tests_done += 1 - - if args.verbose: - timestamp = datetime.now().strftime("%H:%M:%S") - print( - f"\r[{timestamp}] {status:4s} {duration:6.1f}s Serial {idx:2d}/{len(serial_tests_to_run):2d} {test}" + " " * 20) - - print_progress(phase="Serial") - - serial_result.duration = time.time() - serial_start - results.append(serial_result) - - # Clear the progress line after serial phase - print() + r = future.result(timeout=0) + if r.worker_id not in seen_workers: + results.append(r) + seen_workers.add(r.worker_id) + except Exception: + pass + # Capture partial serial results if we were in the serial phase. + if serial_result is not None and serial_result.worker_id not in seen_workers: + serial_result.duration = time.time() - serial_start + results.append(serial_result) total_duration = time.time() - start_time @@ -942,6 +1009,9 @@ def print_progress(phase: str = "Parallel"): print_summary(results, total_duration, timings if args.collect_timings else None) # Exit with non-zero if any tests failed + if interrupted: + print("\nRun was interrupted by user.") + sys.exit(130) total_failed = sum(r.failed + r.exceptions for r in results) sys.exit(1 if total_failed > 0 else 0) diff --git a/tests/autest.sh.in b/tests/autest.sh.in index 6554f42ee63..b4edfff9ea4 100755 --- a/tests/autest.sh.in +++ b/tests/autest.sh.in @@ -1,13 +1,18 @@ #!/bin/bash # -# conveinience script for running autest after building the target +# convenience script for running autest after building the target # +# Usage: +# ./autest.sh [autest args] # run tests sequentially +# ./autest.sh -j N [autest args] # run tests in parallel with N workers # export LD_LIBRARY_PATH=${CMAKE_INSTALL_PREFIX}/lib export PYTHONPATH=${CMAKE_CURRENT_SOURCE_DIR}/gold_tests/remap:${CMAKE_CURRENT_SOURCE_DIR}/gold_tests/lib:$PYTHONPATH -# Define tests to skip for CURL_UDS_FLAG +# Move test directories that require features not supported by the current curl +# build. This must happen before the parallel runner dispatches because it +# discovers tests by scanning the gold_tests directory. if [ -n "${CURL_UDS_FLAG}" ]; then mkdir -p "${CMAKE_SKIP_GOLD_DIR}" if [ -d "${CMAKE_GOLD_DIR}/h2" ]; then @@ -21,17 +26,37 @@ if [ -n "${CURL_UDS_FLAG}" ]; then fi fi -uv run autest \ - --sandbox ${AUTEST_SANDBOX} \ - --directory ${CMAKE_GOLD_DIR} \ - --ats-bin=${CMAKE_INSTALL_PREFIX}/bin \ - --proxy-verifier-bin ${PROXY_VERIFIER_PATH} \ - --build-root ${CMAKE_BINARY_DIR} \ - ${CURL_UDS_FLAG} ${AUTEST_OPTIONS} \ - "$@" -autest_exit=$? +# Check whether parallel mode was requested. +parallel_mode=false +for arg in "$@"; do + case "$arg" in + -j|-j[0-9]*|--jobs|--jobs=*) + parallel_mode=true + break + ;; + esac +done -# Restore tests back to source tree and remove temp dir +if $parallel_mode; then + python3 "$(dirname "$0")/autest-parallel.py" \ + --ats-bin=${CMAKE_INSTALL_PREFIX}/bin \ + --build-root ${CMAKE_BINARY_DIR} \ + --sandbox ${AUTEST_SANDBOX} \ + ${CURL_UDS_FLAG} ${AUTEST_OPTIONS} "$@" + autest_exit=$? +else + uv run autest \ + --sandbox ${AUTEST_SANDBOX} \ + --directory ${CMAKE_GOLD_DIR} \ + --ats-bin=${CMAKE_INSTALL_PREFIX}/bin \ + --proxy-verifier-bin ${PROXY_VERIFIER_PATH} \ + --build-root ${CMAKE_BINARY_DIR} \ + ${CURL_UDS_FLAG} ${AUTEST_OPTIONS} \ + "$@" + autest_exit=$? +fi + +# Restore tests back to source tree and remove temp dir. if [ -n "${CURL_UDS_FLAG}" ]; then if [ -d "${CMAKE_SKIP_GOLD_DIR}/h2" ]; then mv "${CMAKE_SKIP_GOLD_DIR}/h2" "${CMAKE_GOLD_DIR}/h2" From cdb469bf94f302dc0e316478a5e3d889341868d0 Mon Sep 17 00:00:00 2001 From: Brian Neradt Date: Tue, 10 Feb 2026 19:51:34 +0000 Subject: [PATCH 11/11] Default autest sandbox to /tmp and fix temp file hygiene The AUTEST_SANDBOX CMake cache variable previously defaulted to ${CMAKE_BINARY_DIR}/_sandbox, placing test output inside the build tree. This changes the default to /tmp/sb_, where is the first 8 characters of the MD5 of CMAKE_BINARY_DIR. This keeps sandbox paths short (Unix domain socket paths are limited to 108 characters, and deep build directories in home directories can exceed this, causing confusing test failures) while providing deterministic per-build isolation. The variable remains a CACHE STRING, so existing builds retain their configured value and users can still override it via -DAUTEST_SANDBOX=. Additionally, fix two pre-existing test hygiene problems that leave stale files behind after test runs: 1. traffic_ctl_test_utils.py: MakeGoldFileWithText was writing generated gold files directly into the source tree (tests/gold_tests/traffic_ctl/gold/) via Test.TestDirectory. This caused untracked .gold files to accumulate in the repository. The fix writes generated gold files to a process-unique temporary directory (via tempfile.mkdtemp) that is cleaned up at process exit via an atexit handler. 2. remap_acl.test.py: Two loops used tempfile.mkstemp() to create .replay files in /tmp but never closed the returned file descriptors and never cleaned up the files. The fix creates a dedicated temporary directory for replay files (also cleaned up at process exit via atexit), properly closes the file descriptors, and directs mkstemp to write into that managed directory. --- CMakeLists.txt | 12 +++-- tests/gold_tests/remap/remap_acl.test.py | 13 ++++- .../traffic_ctl/traffic_ctl_test_utils.py | 51 +++++++++++-------- 3 files changed, 49 insertions(+), 27 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 7df4f8b63ca..2e6b59f271c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -632,10 +632,16 @@ if(ENABLE_DOCS) endif() if(ENABLE_AUTEST) + # Default the sandbox to /tmp to keep paths short. Unix domain socket paths + # are limited to 108 characters and deep build directories (e.g. in home + # directories) can exceed this limit, causing confusing test failures. A hash + # of CMAKE_BINARY_DIR provides per-build isolation while keeping the path + # deterministic across runs. + string(MD5 _build_dir_hash "${CMAKE_BINARY_DIR}") + string(SUBSTRING "${_build_dir_hash}" 0 8 _build_dir_hash) set(AUTEST_SANDBOX - ${CMAKE_BINARY_DIR}/_sandbox - CACHE STRING "Location for autest output (default - CMAKE_BINARY_DIR/_sandbox)" + "/tmp/sb_${_build_dir_hash}" + CACHE STRING "Location for autest output (default /tmp/sb_)" ) set(AUTEST_OPTIONS "" diff --git a/tests/gold_tests/remap/remap_acl.test.py b/tests/gold_tests/remap/remap_acl.test.py index b1665bf895e..6aec7b8ad33 100644 --- a/tests/gold_tests/remap/remap_acl.test.py +++ b/tests/gold_tests/remap/remap_acl.test.py @@ -15,10 +15,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import atexit import os import io import re import inspect +import shutil import tempfile from yaml import load, dump from yaml import CLoader as Loader @@ -26,6 +28,11 @@ from ports import get_port +# Temporary directory for generated replay files. Cleaned up at process exit +# so the files don't accumulate in /tmp across test runs. +_replay_tmpdir = tempfile.mkdtemp(prefix='autest_replay_') +atexit.register(shutil.rmtree, _replay_tmpdir, True) + Test.Summary = ''' Verify remap.config acl behavior. ''' @@ -588,7 +595,8 @@ def replay_proxy_response(filename, replay_file, get_proxy_response, post_proxy_ Test all acl combinations """ for idx, test in enumerate(all_acl_combination_tests): - (_, replay_file_name) = tempfile.mkstemp(suffix="acl_table_test_{}.replay".format(idx)) + (fd, replay_file_name) = tempfile.mkstemp(suffix="acl_table_test_{}.replay".format(idx), dir=_replay_tmpdir) + os.close(fd) replay_proxy_response( "base.replay.yaml", replay_file_name, @@ -614,7 +622,8 @@ def replay_proxy_response(filename, replay_file, get_proxy_response, post_proxy_ test["deactivate_ip_allow"] except: print(test) - (_, replay_file_name) = tempfile.mkstemp(suffix="deactivate_ip_allow_table_test_{}.replay".format(idx)) + (fd, replay_file_name) = tempfile.mkstemp(suffix="deactivate_ip_allow_table_test_{}.replay".format(idx), dir=_replay_tmpdir) + os.close(fd) replay_proxy_response( "base.replay.yaml", replay_file_name, diff --git a/tests/gold_tests/traffic_ctl/traffic_ctl_test_utils.py b/tests/gold_tests/traffic_ctl/traffic_ctl_test_utils.py index a40d60e86bd..b4c2012238f 100644 --- a/tests/gold_tests/traffic_ctl/traffic_ctl_test_utils.py +++ b/tests/gold_tests/traffic_ctl/traffic_ctl_test_utils.py @@ -14,36 +14,43 @@ # See the License for the specific language governing permissions and # limitations under the License. +import atexit import os +import shutil +import tempfile -# This function can(eventually) be used to have a single yaml file and read nodes from it. -# The idea would be to avoid having multiple gold files with yaml content. -# The only issue would be the comments, this is because how the yaml lib reads yaml, -# comments aren't rendered in the same way as traffic_ctl throws it, it should only -# be used if no comments need to be compared. -# -# def GoldFilePathFor(node:str, main_file="gold/test_gold_file.yaml"): -# if node == "": -# raise Exception("node should not be empty") +_gold_tmpdir = None -# yaml = ruamel.yaml.YAML() -# yaml.indent(sequence=4, offset=2) -# with open(os.path.join(Test.TestDirectory, main_file), 'r') as f: -# content = yaml.load(f) -# node_data = content[node] -# data_dirname = 'generated_gold_files' -# data_path = os.path.join(Test.TestDirectory, data_dirname) -# os.makedirs(data_path, exist_ok=True) -# gold_filepath = os.path.join(data_path, f'test_{TestNumber}.gold') -# with open(os.path.join(data_path, f'test_{TestNumber}.gold'), 'w') as gold_file: -# yaml.dump(node_data, gold_file) +def _get_gold_tmpdir(): + """Return a temporary directory for generated gold files. -# return gold_filepath + The directory is created on first call and registered for cleanup at + process exit so generated gold files never accumulate in /tmp. + """ + global _gold_tmpdir + if _gold_tmpdir is None: + _gold_tmpdir = tempfile.mkdtemp(prefix='autest_gold_') + atexit.register(shutil.rmtree, _gold_tmpdir, True) + return _gold_tmpdir def MakeGoldFileWithText(content, dir, test_number, add_new_line=True): - data_path = os.path.join(dir, "gold") + """Write expected-output text to a temporary gold file and return its path. + + The gold file is placed in a process-unique temporary directory rather than + the source tree so that generated files don't pollute the repository. + + Args: + content: The expected output text. + dir: Unused (kept for API compatibility). + test_number: Numeric identifier used to name the gold file. + add_new_line: If True, append a trailing newline to content. + + Returns: + Absolute path to the generated gold file. + """ + data_path = os.path.join(_get_gold_tmpdir(), "gold") os.makedirs(data_path, exist_ok=True) gold_filepath = os.path.join(data_path, f'test_{test_number}.gold') with open(gold_filepath, 'w') as gold_file: