diff --git a/openevolve/process_parallel.py b/openevolve/process_parallel.py index a2fd6592a..352c52bc8 100644 --- a/openevolve/process_parallel.py +++ b/openevolve/process_parallel.py @@ -5,14 +5,11 @@ import asyncio import logging import multiprocessing as mp -import pickle -import signal import time -from concurrent.futures import Future, ProcessPoolExecutor +from concurrent.futures import BrokenExecutor, Future, ProcessPoolExecutor from concurrent.futures import TimeoutError as FutureTimeoutError from dataclasses import asdict, dataclass -from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional from openevolve.config import Config from openevolve.database import Program, ProgramDatabase @@ -357,6 +354,10 @@ def __init__( self.num_workers = config.evaluator.parallel_evaluations self.num_islands = config.database.num_islands + # Recovery tracking for process pool crashes + self.recovery_attempts = 0 + self.max_recovery_attempts = 3 + logger.info(f"Initialized process parallel controller with {self.num_workers} workers") def _serialize_config(self, config: Config) -> dict: @@ -434,6 +435,38 @@ def stop(self) -> None: logger.info("Stopped process pool") + def _recover_process_pool(self, failed_iterations: list[int] | None = None) -> None: + """Recover from a crashed process pool by recreating it. + + Args: + failed_iterations: List of iteration numbers that failed and need re-queuing + """ + import gc + + logger.warning("Process pool crashed, attempting recovery...") + + # Shutdown broken executor without waiting (it's already broken) + if self.executor: + try: + self.executor.shutdown(wait=False, cancel_futures=True) + except Exception: + pass # Executor may already be in bad state + self.executor = None + + # Force garbage collection to free memory before restarting + gc.collect() + + # Brief delay to let system stabilize (memory freed, processes cleaned up) + time.sleep(2.0) + + # Recreate the pool + self.start() + + if failed_iterations: + logger.info(f"Pool recovered. {len(failed_iterations)} iterations will be re-queued.") + else: + logger.info("Pool recovered successfully.") + def request_shutdown(self) -> None: """Request graceful shutdown""" logger.info("Graceful shutdown requested...") @@ -559,6 +592,14 @@ async def run_evolution( # Reconstruct program from dict child_program = Program(**result.child_program_dict) + # Reset recovery counter on successful iteration + if self.recovery_attempts > 0: + logger.info( + f"Pool stable after recovery, resetting recovery counter " + f"(was {self.recovery_attempts})" + ) + self.recovery_attempts = 0 + # Add to database with explicit target_island to ensure proper island placement # This fixes issue #391: children should go to the target island, not inherit # from the parent (which may be from a different island due to fallback sampling) @@ -752,6 +793,38 @@ async def run_evolution( ) # Cancel the future to clean up the process future.cancel() + except BrokenExecutor as e: + logger.error(f"Process pool crashed during iteration {completed_iteration}: {e}") + + # Collect all failed iterations from pending futures + failed_iterations = [completed_iteration] + list(pending_futures.keys()) + + # Clear pending futures (they're all invalid now) + pending_futures.clear() + for island_id in island_pending: + island_pending[island_id].clear() + + # Attempt recovery + self.recovery_attempts += 1 + if self.recovery_attempts > self.max_recovery_attempts: + logger.error( + f"Max recovery attempts ({self.max_recovery_attempts}) exceeded. " + f"Stopping evolution." + ) + break + + self._recover_process_pool(failed_iterations) + + # Re-queue failed iterations (distribute across islands) + for i, failed_iter in enumerate(failed_iterations): + if failed_iter < total_iterations: + island_id = i % self.num_islands + future = self._submit_iteration(failed_iter, island_id) + if future: + pending_futures[failed_iter] = future + island_pending[island_id].append(failed_iter) + + continue except Exception as e: logger.error(f"Error processing result from iteration {completed_iteration}: {e}") @@ -822,6 +895,9 @@ def _submit_iteration( return future + except BrokenExecutor: + # Let this propagate up to run_evolution for recovery + raise except Exception as e: logger.error(f"Error submitting iteration {iteration}: {e}") return None diff --git a/tests/test_process_pool_recovery.py b/tests/test_process_pool_recovery.py new file mode 100644 index 000000000..d79ccb21c --- /dev/null +++ b/tests/test_process_pool_recovery.py @@ -0,0 +1,157 @@ +""" +Tests for process pool crash recovery +""" + +import asyncio +import os +import tempfile +import unittest +from unittest.mock import MagicMock, patch +from concurrent.futures import BrokenExecutor, Future + +# Set dummy API key for testing +os.environ["OPENAI_API_KEY"] = "test" + +from openevolve.config import Config +from openevolve.database import Program, ProgramDatabase +from openevolve.process_parallel import ProcessParallelController, SerializableResult + + +class TestProcessPoolRecovery(unittest.TestCase): + """Tests for process pool crash recovery""" + + def setUp(self): + """Set up test environment""" + self.test_dir = tempfile.mkdtemp() + + # Create test config + self.config = Config() + self.config.max_iterations = 10 + self.config.evaluator.parallel_evaluations = 2 + self.config.evaluator.timeout = 10 + self.config.database.num_islands = 2 + self.config.database.in_memory = True + self.config.checkpoint_interval = 5 + + # Create test evaluation file + self.eval_content = """ +def evaluate(program_path): + return {"score": 0.5} +""" + self.eval_file = os.path.join(self.test_dir, "evaluator.py") + with open(self.eval_file, "w") as f: + f.write(self.eval_content) + + # Create test database + self.database = ProgramDatabase(self.config.database) + + # Add some test programs + for i in range(2): + program = Program( + id=f"test_{i}", + code=f"def func_{i}(): return {i}", + language="python", + metrics={"score": 0.5}, + iteration_found=0, + ) + self.database.add(program) + + def tearDown(self): + """Clean up test environment""" + import shutil + + shutil.rmtree(self.test_dir, ignore_errors=True) + + def test_controller_has_recovery_tracking(self): + """Test that controller initializes with recovery tracking attributes""" + controller = ProcessParallelController(self.config, self.eval_file, self.database) + + self.assertEqual(controller.recovery_attempts, 0) + self.assertEqual(controller.max_recovery_attempts, 3) + + def test_recover_process_pool_recreates_executor(self): + """Test that _recover_process_pool recreates the executor""" + controller = ProcessParallelController(self.config, self.eval_file, self.database) + + # Start the controller to create initial executor + controller.start() + self.assertIsNotNone(controller.executor) + original_executor = controller.executor + + # Simulate recovery + with patch("time.sleep"): + controller._recover_process_pool() + + # Verify executor was recreated + self.assertIsNotNone(controller.executor) + self.assertIsNot(controller.executor, original_executor) + + # Clean up + controller.stop() + + def test_broken_executor_triggers_recovery_and_resets_on_success(self): + """Test that BrokenExecutor triggers recovery and counter resets on success""" + + async def run_test(): + controller = ProcessParallelController(self.config, self.eval_file, self.database) + + # Track recovery calls + recovery_called = [] + + def mock_recover(failed_iterations=None): + recovery_called.append(failed_iterations) + + controller._recover_process_pool = mock_recover + + # First call raises BrokenExecutor, subsequent calls succeed + call_count = [0] + + def mock_submit(iteration, island_id): + call_count[0] += 1 + mock_future = MagicMock(spec=Future) + + if call_count[0] == 1: + # First future raises BrokenExecutor when result() is called + mock_future.done.return_value = True + mock_future.result.side_effect = BrokenExecutor("Pool crashed") + else: + # Subsequent calls succeed + mock_result = SerializableResult( + child_program_dict={ + "id": f"child_{call_count[0]}", + "code": "def evolved(): return 1", + "language": "python", + "parent_id": "test_0", + "generation": 1, + "metrics": {"score": 0.7}, + "iteration_found": iteration, + "metadata": {"island": island_id}, + }, + parent_id="test_0", + iteration_time=0.1, + iteration=iteration, + ) + mock_future.done.return_value = True + mock_future.result.return_value = mock_result + mock_future.cancel.return_value = True + + return mock_future + + with patch.object(controller, "_submit_iteration", side_effect=mock_submit): + controller.start() + + # Run evolution - should recover from crash and reset counter on success + await controller.run_evolution( + start_iteration=1, max_iterations=2, target_score=None + ) + + # Verify recovery was triggered + self.assertEqual(len(recovery_called), 1) + # Verify counter was reset after successful iteration + self.assertEqual(controller.recovery_attempts, 0) + + asyncio.run(run_test()) + + +if __name__ == "__main__": + unittest.main()