Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 81 additions & 5 deletions openevolve/process_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@
import asyncio
import logging
import multiprocessing as mp
import pickle
import signal
import time
from concurrent.futures import Future, ProcessPoolExecutor
from concurrent.futures import BrokenExecutor, Future, ProcessPoolExecutor
from concurrent.futures import TimeoutError as FutureTimeoutError
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional

from openevolve.config import Config
from openevolve.database import Program, ProgramDatabase
Expand Down Expand Up @@ -357,6 +354,10 @@ def __init__(
self.num_workers = config.evaluator.parallel_evaluations
self.num_islands = config.database.num_islands

# Recovery tracking for process pool crashes
self.recovery_attempts = 0
self.max_recovery_attempts = 3

logger.info(f"Initialized process parallel controller with {self.num_workers} workers")

def _serialize_config(self, config: Config) -> dict:
Expand Down Expand Up @@ -434,6 +435,38 @@ def stop(self) -> None:

logger.info("Stopped process pool")

def _recover_process_pool(self, failed_iterations: list[int] | None = None) -> None:
"""Recover from a crashed process pool by recreating it.

Args:
failed_iterations: List of iteration numbers that failed and need re-queuing
"""
import gc

logger.warning("Process pool crashed, attempting recovery...")

# Shutdown broken executor without waiting (it's already broken)
if self.executor:
try:
self.executor.shutdown(wait=False, cancel_futures=True)
except Exception:
pass # Executor may already be in bad state
self.executor = None

# Force garbage collection to free memory before restarting
gc.collect()

# Brief delay to let system stabilize (memory freed, processes cleaned up)
time.sleep(2.0)

# Recreate the pool
self.start()

if failed_iterations:
logger.info(f"Pool recovered. {len(failed_iterations)} iterations will be re-queued.")
else:
logger.info("Pool recovered successfully.")

def request_shutdown(self) -> None:
"""Request graceful shutdown"""
logger.info("Graceful shutdown requested...")
Expand Down Expand Up @@ -559,6 +592,14 @@ async def run_evolution(
# Reconstruct program from dict
child_program = Program(**result.child_program_dict)

# Reset recovery counter on successful iteration
if self.recovery_attempts > 0:
logger.info(
f"Pool stable after recovery, resetting recovery counter "
f"(was {self.recovery_attempts})"
)
self.recovery_attempts = 0

# Add to database with explicit target_island to ensure proper island placement
# This fixes issue #391: children should go to the target island, not inherit
# from the parent (which may be from a different island due to fallback sampling)
Expand Down Expand Up @@ -752,6 +793,38 @@ async def run_evolution(
)
# Cancel the future to clean up the process
future.cancel()
except BrokenExecutor as e:
logger.error(f"Process pool crashed during iteration {completed_iteration}: {e}")

# Collect all failed iterations from pending futures
failed_iterations = [completed_iteration] + list(pending_futures.keys())

# Clear pending futures (they're all invalid now)
pending_futures.clear()
for island_id in island_pending:
island_pending[island_id].clear()

# Attempt recovery
self.recovery_attempts += 1
if self.recovery_attempts > self.max_recovery_attempts:
logger.error(
f"Max recovery attempts ({self.max_recovery_attempts}) exceeded. "
f"Stopping evolution."
)
break

self._recover_process_pool(failed_iterations)

# Re-queue failed iterations (distribute across islands)
for i, failed_iter in enumerate(failed_iterations):
if failed_iter < total_iterations:
island_id = i % self.num_islands
future = self._submit_iteration(failed_iter, island_id)
if future:
pending_futures[failed_iter] = future
island_pending[island_id].append(failed_iter)

continue
except Exception as e:
logger.error(f"Error processing result from iteration {completed_iteration}: {e}")

Expand Down Expand Up @@ -822,6 +895,9 @@ def _submit_iteration(

return future

except BrokenExecutor:
# Let this propagate up to run_evolution for recovery
raise
except Exception as e:
logger.error(f"Error submitting iteration {iteration}: {e}")
return None
157 changes: 157 additions & 0 deletions tests/test_process_pool_recovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""
Tests for process pool crash recovery
"""

import asyncio
import os
import tempfile
import unittest
from unittest.mock import MagicMock, patch
from concurrent.futures import BrokenExecutor, Future

# Set dummy API key for testing
os.environ["OPENAI_API_KEY"] = "test"

from openevolve.config import Config
from openevolve.database import Program, ProgramDatabase
from openevolve.process_parallel import ProcessParallelController, SerializableResult


class TestProcessPoolRecovery(unittest.TestCase):
"""Tests for process pool crash recovery"""

def setUp(self):
"""Set up test environment"""
self.test_dir = tempfile.mkdtemp()

# Create test config
self.config = Config()
self.config.max_iterations = 10
self.config.evaluator.parallel_evaluations = 2
self.config.evaluator.timeout = 10
self.config.database.num_islands = 2
self.config.database.in_memory = True
self.config.checkpoint_interval = 5

# Create test evaluation file
self.eval_content = """
def evaluate(program_path):
return {"score": 0.5}
"""
self.eval_file = os.path.join(self.test_dir, "evaluator.py")
with open(self.eval_file, "w") as f:
f.write(self.eval_content)

# Create test database
self.database = ProgramDatabase(self.config.database)

# Add some test programs
for i in range(2):
program = Program(
id=f"test_{i}",
code=f"def func_{i}(): return {i}",
language="python",
metrics={"score": 0.5},
iteration_found=0,
)
self.database.add(program)

def tearDown(self):
"""Clean up test environment"""
import shutil

shutil.rmtree(self.test_dir, ignore_errors=True)

def test_controller_has_recovery_tracking(self):
"""Test that controller initializes with recovery tracking attributes"""
controller = ProcessParallelController(self.config, self.eval_file, self.database)

self.assertEqual(controller.recovery_attempts, 0)
self.assertEqual(controller.max_recovery_attempts, 3)

def test_recover_process_pool_recreates_executor(self):
"""Test that _recover_process_pool recreates the executor"""
controller = ProcessParallelController(self.config, self.eval_file, self.database)

# Start the controller to create initial executor
controller.start()
self.assertIsNotNone(controller.executor)
original_executor = controller.executor

# Simulate recovery
with patch("time.sleep"):
controller._recover_process_pool()

# Verify executor was recreated
self.assertIsNotNone(controller.executor)
self.assertIsNot(controller.executor, original_executor)

# Clean up
controller.stop()

def test_broken_executor_triggers_recovery_and_resets_on_success(self):
"""Test that BrokenExecutor triggers recovery and counter resets on success"""

async def run_test():
controller = ProcessParallelController(self.config, self.eval_file, self.database)

# Track recovery calls
recovery_called = []

def mock_recover(failed_iterations=None):
recovery_called.append(failed_iterations)

controller._recover_process_pool = mock_recover

# First call raises BrokenExecutor, subsequent calls succeed
call_count = [0]

def mock_submit(iteration, island_id):
call_count[0] += 1
mock_future = MagicMock(spec=Future)

if call_count[0] == 1:
# First future raises BrokenExecutor when result() is called
mock_future.done.return_value = True
mock_future.result.side_effect = BrokenExecutor("Pool crashed")
else:
# Subsequent calls succeed
mock_result = SerializableResult(
child_program_dict={
"id": f"child_{call_count[0]}",
"code": "def evolved(): return 1",
"language": "python",
"parent_id": "test_0",
"generation": 1,
"metrics": {"score": 0.7},
"iteration_found": iteration,
"metadata": {"island": island_id},
},
parent_id="test_0",
iteration_time=0.1,
iteration=iteration,
)
mock_future.done.return_value = True
mock_future.result.return_value = mock_result
mock_future.cancel.return_value = True

return mock_future

with patch.object(controller, "_submit_iteration", side_effect=mock_submit):
controller.start()

# Run evolution - should recover from crash and reset counter on success
await controller.run_evolution(
start_iteration=1, max_iterations=2, target_score=None
)

# Verify recovery was triggered
self.assertEqual(len(recovery_called), 1)
# Verify counter was reset after successful iteration
self.assertEqual(controller.recovery_attempts, 0)

asyncio.run(run_test())


if __name__ == "__main__":
unittest.main()