Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions codesage/config/governance.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class ValidationConfig(BaseModel):
},
description="Commands to run tests for different languages."
)
execution_timeout: int = Field(30, description="Timeout in seconds for sandbox execution.")
max_retries: int = Field(3, description="Maximum number of retries for failed validation.")


class GovernanceConfig(BaseModel):
Expand Down
217 changes: 191 additions & 26 deletions codesage/governance/patch_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import difflib
import re
import shutil
import ast
from pathlib import Path
from typing import Optional, Tuple

import structlog

from codesage.analyzers.parser_factory import create_parser

logger = structlog.get_logger()


Expand All @@ -21,22 +24,17 @@ def extract_code_block(self, llm_response: str, language: str = "") -> Optional[
Extracts the content of a markdown code block.
Prioritizes blocks marked with the specific language.
"""
# Pattern for ```language ... ```
# We try to match specifically the requested language first
if language:
pattern = re.compile(rf"```{language}\s*\n(.*?)\n```", re.DOTALL)
match = pattern.search(llm_response)
if match:
return match.group(1)

# Fallback: match any code block
pattern = re.compile(r"```(?:\w+)?\s*\n(.*?)\n```", re.DOTALL)
match = pattern.search(llm_response)
if match:
return match.group(1)

# If no code block is found, we return None to be safe.
# Returning the whole response might risk injecting chat text into source code.
return None

def apply_patch(self, file_path: str | Path, new_content: str, create_backup: bool = True) -> bool:
Expand All @@ -55,16 +53,6 @@ def apply_patch(self, file_path: str | Path, new_content: str, create_backup: bo
shutil.copy2(path, backup_path)
logger.info("Backup created", backup_path=str(backup_path))

# For Phase 1, we assume new_content is the FULL file content
# or we need to compute diff?
# The task says "implement extract_code_block and apply_diff".
# If the LLM returns a full file, we just overwrite.
# If the LLM returns a diff or snippet, we need to handle it.
# For now, let's assume the prompt asks for the FULL file content or we do full replacement.
# If we want to support git-style diffs, we need more complex logic.
# Based on "AC-2: PatchManager can correct parse LLM returned Markdown code block... and replace it to source file",
# I will implement full replacement for now as it's safer for "Apply" than trying to merge snippets without line numbers.

path.write_text(new_content, encoding="utf-8")
logger.info("Patch applied successfully", file_path=str(path))
return True
Expand All @@ -73,10 +61,196 @@ def apply_patch(self, file_path: str | Path, new_content: str, create_backup: bo
logger.error("Failed to apply patch", file_path=str(path), error=str(e))
return False

def create_diff(self, original: str, new: str, filename: str = "file") -> str:
def apply_fuzzy_patch(self, file_path: str | Path, new_code_block: str, target_symbol: str = None) -> bool:
"""
Applies a patch using fuzzy matching logic when exact replacement isn't feasible.
"""
path = Path(file_path)
if not path.exists():
logger.error("File not found for fuzzy patching", file_path=str(path))
return False

try:
original_content = path.read_text(encoding="utf-8")
patched_content = None

if target_symbol:
patched_content = self._replace_symbol(file_path, original_content, target_symbol, new_code_block)
if patched_content:
logger.info("Symbol replaced successfully", symbol=target_symbol)

if not patched_content:
patched_content = self._apply_context_patch(original_content, new_code_block)
if patched_content:
logger.info("Context patch applied successfully")

if not patched_content:
logger.warning("Could not apply fuzzy patch")
return False

language = self._get_language_from_extension(path.suffix)
if language and not self._verify_syntax(patched_content, language):
logger.error("Patched content failed syntax check", language=language)
return False

backup_path = path.with_suffix(path.suffix + ".bak")
if not backup_path.exists():
shutil.copy2(path, backup_path)

path.write_text(patched_content, encoding="utf-8")
return True

except Exception as e:
logger.error("Failed to apply fuzzy patch", file_path=str(path), error=str(e))
return False

def _replace_symbol(self, file_path: str | Path, content: str, symbol_name: str, new_block: str) -> Optional[str]:
"""
Uses simple indentation-based parsing to find and replace a Python function.
"""
Creates a unified diff between original and new content.
path = Path(file_path)
if path.suffix != '.py':
return None # Only Python implemented for P1 regex

lines = content.splitlines(keepends=True)
start_idx = -1
end_idx = -1
current_indent = 0

# Regex to find definition
def_pattern = re.compile(rf"^(\s*)def\s+{re.escape(symbol_name)}\s*\(")

for i, line in enumerate(lines):
match = def_pattern.match(line)
if match:
start_idx = i
current_indent = len(match.group(1))
break

if start_idx == -1:
return None

# Find end: Look for next line with same or less indentation that is NOT empty/comment
# This is naive but works for standard formatting
for i in range(start_idx + 1, len(lines)):
line = lines[i]
if not line.strip() or line.strip().startswith('#'):
continue

# Check indentation
indent = len(line) - len(line.lstrip())
if indent <= current_indent:
end_idx = i
break
else:
end_idx = len(lines) # End of file

# Replace lines[start_idx:end_idx] with new_block
# Ensure new_block ends with newline if needed
if not new_block.endswith('\n'):
new_block += '\n'

new_lines = lines[:start_idx] + [new_block] + lines[end_idx:]
return "".join(new_lines)

def _apply_context_patch(self, original: str, new_block: str) -> Optional[str]:
"""
Uses difflib to find a close match for replacement.
Finds the most similar block in the original content and replaces it.
"""
# Split into lines
original_lines = original.splitlines(keepends=True)
new_lines = new_block.splitlines(keepends=True)

if not new_lines:
return None

# Assumption: The new_block is a modified version of some block in the original.
# We search for the block in original that has the highest similarity to new_block.

best_ratio = 0.0
best_match_start = -1
best_match_end = -1

# Try to find header match
header = new_lines[0].strip()
# If header is empty or just braces, it's hard.
if not header:
return None

candidates = []
for i, line in enumerate(original_lines):
if header in line: # Loose match
candidates.append(i)

# For each candidate start, try to find the end of the block (indentation based)
# and compare similarity.

for start_idx in candidates:
# Determine end_idx based on indentation of start_idx
current_indent = len(original_lines[start_idx]) - len(original_lines[start_idx].lstrip())
end_idx = len(original_lines)

for i in range(start_idx + 1, len(original_lines)):
line = original_lines[i]
if not line.strip() or line.strip().startswith('#'):
continue
indent = len(line) - len(line.lstrip())
if indent <= current_indent:
end_idx = i
break

# Check similarity of this block with new_block
old_block = "".join(original_lines[start_idx:end_idx])
ratio = difflib.SequenceMatcher(None, old_block, new_block).ratio()

if ratio > best_ratio:
best_ratio = ratio
best_match_start = start_idx
best_match_end = end_idx

# Threshold
if best_ratio > 0.6: # Allow some significant changes but ensure it's roughly the same place
# Replace
new_content_lines = original_lines[:best_match_start] + new_lines + original_lines[best_match_end:]

return "".join(new_content_lines)

return None

def _verify_syntax(self, content: str, language: str) -> bool:
if language == "python":
try:
ast.parse(content)
return True
except SyntaxError:
return False
elif language == "go":
try:
parser = create_parser("go")
parser.parse(content)
root = parser.tree.root_node
return not self._has_error_node(root)
except Exception:
return False
return True

def _has_error_node(self, node) -> bool:
if node.type == 'ERROR' or node.is_missing:
return True
for child in node.children:
if self._has_error_node(child):
return True
return False

def _get_language_from_extension(self, ext: str) -> Optional[str]:
if ext in ['.py', '.pyi']:
return 'python'
if ext in ['.go']:
return 'go'
return None

def create_diff(self, original: str, new: str, filename: str = "file") -> str:
diff = difflib.unified_diff(
original.splitlines(keepends=True),
new.splitlines(keepends=True),
Expand All @@ -86,9 +260,6 @@ def create_diff(self, original: str, new: str, filename: str = "file") -> str:
return "".join(diff)

def restore_backup(self, file_path: str | Path) -> bool:
"""
Restores the file from its backup (.bak).
"""
path = Path(file_path)
backup_path = path.with_suffix(path.suffix + ".bak")

Expand All @@ -105,15 +276,9 @@ def restore_backup(self, file_path: str | Path) -> bool:
return False

def revert(self, file_path: str | Path) -> bool:
"""
Alias for restore_backup, used for semantic clarity during rollback.
"""
return self.restore_backup(file_path)

def cleanup_backup(self, file_path: str | Path) -> bool:
"""
Removes the backup file if it exists.
"""
path = Path(file_path)
backup_path = path.with_suffix(path.suffix + ".bak")

Expand Down
57 changes: 48 additions & 9 deletions codesage/governance/sandbox.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import subprocess
import os
import structlog
from typing import Dict, Optional, Tuple
import shlex
from typing import Dict, Optional, Tuple, List

logger = structlog.get_logger()

Expand All @@ -20,21 +21,14 @@ def run(self, command: str | list[str], env: Optional[Dict[str, str]] = None, cw
if env:
run_env.update(env)

# If command is a string, we split it for safety if not using shell=True
# But the user config provides a string template.
# Ideally, we should parse it into arguments.
# For this phase, we will switch to shell=False if list is provided,
# but if string is provided, we might still need shell=True or shlex.split.
# To address security, we use shlex.split if it's a string.
import shlex
if isinstance(command, str):
args = shlex.split(command)
else:
args = command

result = subprocess.run(
args,
shell=False, # Changed to False for security
shell=False,
capture_output=True,
text=True,
timeout=self.timeout,
Expand All @@ -53,3 +47,48 @@ def run(self, command: str | list[str], env: Optional[Dict[str, str]] = None, cw
except Exception as e:
logger.error("Sandbox execution failed", command=command, error=str(e))
return False, str(e)

def run_tests(self, test_files: List[str], language: str) -> Tuple[bool, str]:
"""
Executes tests for the given language and test files.
Automatically constructs the test command.
"""
if not test_files:
return True, "No test files to run."

command = []
if language == "python":
# Assuming pytest is available in the environment
command = ["pytest"] + test_files
elif language == "go":
# Go tests run per package usually, but can target files if in same package.
# Best practice is `go test ./pkg/...` or `go test path/to/file_test.go`
# However, `go test file.go` requires passing all files in the package.
# Safer to find the directory of the test file and run `go test -v ./path/to/dir`
# But if multiple directories, we need multiple commands or one `go test ./...` with patterns.

# For simplicity, let's group by directory
dirs = set(os.path.dirname(f) for f in test_files)
if len(dirs) == 1:
# Single directory
d = list(dirs)[0]
# If d is empty (current dir), use "."
target = d if d else "."
if not target.startswith(".") and not os.path.isabs(target):
target = "./" + target
command = ["go", "test", "-v", target]
else:
# Multiple directories - run for each? Or just list them?
# Go test accepts multiple packages.
targets = []
for d in dirs:
target = d if d else "."
if not target.startswith(".") and not os.path.isabs(target):
target = "./" + target
targets.append(target)
command = ["go", "test", "-v"] + targets
else:
return False, f"Unsupported language for test execution: {language}"

logger.info("Running tests", language=language, command=command)
return self.run(command)
Loading
Loading