diff --git a/codesage/cli/commands/scan.py b/codesage/cli/commands/scan.py index d56fb00..8ff4e73 100644 --- a/codesage/cli/commands/scan.py +++ b/codesage/cli/commands/scan.py @@ -13,6 +13,10 @@ from codesage.cli.plugin_loader import PluginManager from codesage.history.store import StorageEngine from codesage.core.interfaces import CodeIssue +from codesage.risk.risk_scorer import RiskScorer +from codesage.config.risk_baseline import RiskBaselineConfig +from codesage.rules.jules_specific_rules import JULES_RULESET +from codesage.rules.base import RuleContext from datetime import datetime, timezone def get_builder(language: str, path: Path): @@ -144,8 +148,10 @@ def merge_snapshots(snapshots: List[ProjectSnapshot], project_name: str) -> Proj @click.option('--ci-mode', is_flag=True, help='Enable CI mode (auto-detect GitHub environment).') @click.option('--plugins-dir', default='.codesage/plugins', help='Directory containing plugins.') @click.option('--db-url', default='sqlite:///codesage.db', help='Database URL for storage.') +@click.option('--git-repo', type=click.Path(), help='Git 仓库路径(用于变更历史分析)') +@click.option('--coverage-report', type=click.Path(), help='覆盖率报告路径(Cobertura/JaCoCo XML)') @click.pass_context -def scan(ctx, path, language, reporter, output, fail_on_high, ci_mode, plugins_dir, db_url): +def scan(ctx, path, language, reporter, output, fail_on_high, ci_mode, plugins_dir, db_url, git_repo, coverage_report): """ Scan the codebase and report issues. """ @@ -205,16 +211,73 @@ def scan(ctx, path, language, reporter, output, fail_on_high, ci_mode, plugins_d click.echo(f"Failed to merge snapshots: {e}", err=True) ctx.exit(1) - # 3. Apply Custom Rules (Plugins) + # Populate file contents if missing (needed for rules) + click.echo("Populating file contents...") + for file_snapshot in snapshot.files: + if not file_snapshot.content: + try: + full_path = root_path / file_snapshot.path + if full_path.exists(): + file_snapshot.content = full_path.read_text(errors='ignore') + # Update size if missing + if file_snapshot.size is None: + file_snapshot.size = len(file_snapshot.content) + except Exception as e: + # logger.warning(f"Failed to read file {file_snapshot.path}: {e}") + pass + + # 3. Apply Risk Scoring (Enhanced in Phase 1) + try: + risk_config = RiskBaselineConfig() # Load default config + scorer = RiskScorer( + config=risk_config, + repo_path=git_repo or path, # Default to scanned path if not specified + coverage_report=coverage_report + ) + snapshot = scorer.score_project(snapshot) + except Exception as e: + click.echo(f"Warning: Risk scoring failed: {e}", err=True) + + # 4. Apply Custom Rules (Plugins & Jules Rules) + + # Create RuleContext + # We need a dummy config for now as RuleContext expects one, but JulesRules might not use it. + # However, PythonRulesetBaselineConfig is expected by RuleContext definition in base.py. + # We need to import it or mock it. + from codesage.config.rules_python_baseline import RulesPythonBaselineConfig + rule_config = RulesPythonBaselineConfig() # Default config + + # Apply Jules Specific Rules + click.echo("Applying Jules-specific rules...") + for rule in JULES_RULESET: + for file_snapshot in snapshot.files: + try: + # Create context for this file + rule_ctx = RuleContext( + project=snapshot, + file=file_snapshot, + config=rule_config + ) + + # Call rule.check(ctx) + # Ensure rule supports check(ctx) + issues = rule.check(rule_ctx) + + if issues: + if file_snapshot.issues is None: + file_snapshot.issues = [] + file_snapshot.issues.extend(issues) + except Exception as e: + click.echo(f"Error applying rule {rule.rule_id} to {file_snapshot.path}: {e}", err=True) + + # Apply Plugin Rules for rule in plugin_manager.rules: # Ensure we iterate over the list of files for file_snapshot in snapshot.files: file_path = Path(file_snapshot.path) try: - content = "" - full_path = root_path / file_path - if full_path.exists(): - content = full_path.read_text(errors='ignore') + # Content is already populated now + content = file_snapshot.content or "" issues = rule.check(str(file_path), content, {}) if issues: @@ -249,29 +312,33 @@ def scan(ctx, path, language, reporter, output, fail_on_high, ci_mode, plugins_d except Exception as e: click.echo(f"Error running rule {rule.id} on {file_path}: {e}", err=True) - # Recalculate Issues Summary after Plugins - # Simplified recalculation + # Recalculate Issues Summary after Plugins & Jules Rules total_issues = 0 by_severity = {} + by_rule = {} for f in snapshot.files: if f.issues: total_issues += len(f.issues) for issue in f.issues: by_severity[issue.severity] = by_severity.get(issue.severity, 0) + 1 + if issue.rule_id: + by_rule[issue.rule_id] = by_rule.get(issue.rule_id, 0) + 1 # Update snapshot summary if issues changed if snapshot.issues_summary: snapshot.issues_summary.total_issues = total_issues snapshot.issues_summary.by_severity = by_severity + snapshot.issues_summary.by_rule = by_rule else: snapshot.issues_summary = ProjectIssuesSummary( total_issues=total_issues, - by_severity=by_severity + by_severity=by_severity, + by_rule=by_rule ) - # 4. Save to Storage + # 5. Save to Storage if storage: try: storage.save_snapshot(snapshot.metadata.project_name, snapshot) diff --git a/codesage/config/risk_baseline.py b/codesage/config/risk_baseline.py index f26fe32..4930871 100644 --- a/codesage/config/risk_baseline.py +++ b/codesage/config/risk_baseline.py @@ -1,4 +1,4 @@ -from pydantic import BaseModel +from pydantic import BaseModel, Field class RiskBaselineConfig(BaseModel): """Configuration for the baseline risk scorer.""" @@ -9,13 +9,16 @@ class RiskBaselineConfig(BaseModel): weight_fan_out: float = 0.2 weight_loc: float = 0.1 - # Weights for multi-dimensional scoring - # Final = w_static * static + w_churn * churn + w_cov * (static * (1-cov)) - # Or as per task: Score = w1 * Complexity + w2 * Churn + w3 * (1 - Coverage) - # The "Complexity" here refers to the static score calculated above. + # Weights for multi-dimensional scoring (New Model) + # Risk = w1·Complexity + w2·Churn + w3·(1-Coverage) + w4·AuthorDiversity + w5·FileSize + weight_complexity: float = Field(default=0.30, description="Weight for complexity score") + weight_churn: float = Field(default=0.25, description="Weight for git churn score") + weight_coverage: float = Field(default=0.25, description="Weight for coverage risk") + weight_author_diversity: float = Field(default=0.10, description="Weight for author diversity") + weight_file_size: float = Field(default=0.10, description="Weight for file size (LOC)") + # Legacy weights (kept for backward compatibility if needed, but new model supersedes) weight_static_score: float = 0.5 - weight_churn: float = 0.3 weight_coverage_penalty: float = 0.2 # Propagation @@ -29,7 +32,7 @@ class RiskBaselineConfig(BaseModel): # Churn settings churn_since_days: int = 90 - threshold_churn_high: int = 10 # If file changed > 10 times in 90 days, normalized churn = 1.0 + threshold_churn_high: int = 10 @classmethod def from_defaults(cls) -> "RiskBaselineConfig": diff --git a/codesage/git/miner.py b/codesage/git/miner.py new file mode 100644 index 0000000..f85efd7 --- /dev/null +++ b/codesage/git/miner.py @@ -0,0 +1,121 @@ +"""Git 历史数据挖掘器 +实现架构设计第 3.1.3 节的"代码演化分析"能力 +""" +from datetime import datetime, timedelta +import logging +from typing import Dict, List, Optional, Set +import os + +try: + from git import Repo, InvalidGitRepositoryError +except ImportError: + Repo = None + InvalidGitRepositoryError = None + +logger = logging.getLogger(__name__) + +class GitMiner: + """Git 历史挖掘器 + + 核心指标(对齐架构设计): + - 变更频率: 近 N 天内的提交次数 + - 文件热度: 累计变更行数 / 文件总行数 (这里简化为变更次数,后续可扩展) + - 作者分散度: 不同作者数量(高分散度 = 高风险) + """ + + def __init__(self, repo_path: Optional[str] = None): + self.repo_path = repo_path or os.getcwd() + self.repo = None + self._churn_cache: Dict[str, int] = {} + self._author_cache: Dict[str, Set[str]] = {} + self._cache_initialized = False + + if Repo: + try: + self.repo = Repo(self.repo_path, search_parent_directories=True) + except (InvalidGitRepositoryError, Exception) as e: + logger.warning(f"Failed to initialize Git repo at {self.repo_path}: {e}") + + def _initialize_stats(self, days: int = 90): + """Bulk process commits to populate caches.""" + if self._cache_initialized: + return + + if not self.repo: + return + + try: + since_date = datetime.now() - timedelta(days=days) + # Use traverse_commits for potentially faster iteration if supported, otherwise standard iteration + # Iterating over all commits once is O(N_commits * M_files_changed) which is better than O(F_files * N_commits) + commits = self.repo.iter_commits(since=since_date) + + for commit in commits: + # stats.files returns dict {path: stats} + for file_path in commit.stats.files.keys(): + self._churn_cache[file_path] = self._churn_cache.get(file_path, 0) + 1 + + if file_path not in self._author_cache: + self._author_cache[file_path] = set() + self._author_cache[file_path].add(commit.author.email) + + self._cache_initialized = True + except Exception as e: + logger.error(f"Error initializing git stats: {e}") + + def get_file_churn_score(self, file_path: str, days: int = 90) -> float: + """计算文件变更频率评分(0-10) + + 算法: score = min(10, commit_count / (days / 30)) + - 月均 1 次提交 = 1 分 + - 月均 10 次提交 = 10 分(满分) + """ + if not self.repo: + return 0.0 + + # Ensure cache is populated + self._initialize_stats(days) + + # We need exact path match. + # Note: git paths are relative to repo root. `file_path` usually is relative too. + # But we might need normalization if `file_path` comes from different source. + # Assuming consistency for now. + + commit_count = self._churn_cache.get(file_path, 0) + + denominator = max(days / 30, 1) # avoid division by zero + score = min(10.0, commit_count / denominator) + return round(score, 2) + + def get_file_author_count(self, file_path: str) -> int: + """统计文件的历史贡献者数量 + + 用于评估"维护一致性风险": + - 1 人维护: 低风险(知识集中) + - 5+ 人维护: 高风险(理解成本高) + """ + if not self.repo: + return 0 + + self._initialize_stats() + + authors = self._author_cache.get(file_path, set()) + return len(authors) + + def get_hotspot_files(self, top_n: int = 20) -> List[Dict]: + """识别代码热点(高频变更文件) + """ + if not self.repo: + return [] + + self._initialize_stats() + + sorted_files = sorted(self._churn_cache.items(), key=lambda x: x[1], reverse=True)[:top_n] + + result = [] + for path, count in sorted_files: + result.append({ + "path": path, + "commits": count + }) + return result diff --git a/codesage/risk/risk_scorer.py b/codesage/risk/risk_scorer.py index a69bc14..1753879 100644 --- a/codesage/risk/risk_scorer.py +++ b/codesage/risk/risk_scorer.py @@ -1,9 +1,10 @@ from typing import Dict, List, Optional +import math from codesage.config.risk_baseline import RiskBaselineConfig from codesage.snapshot.models import FileMetrics, FileRisk, ProjectRiskSummary, ProjectSnapshot -from codesage.history.git_miner import GitMiner -from codesage.risk.scorers.coverage_scorer import CoverageScorer +from codesage.git.miner import GitMiner +from codesage.test.coverage_parser import CoverageParser from codesage.risk.propagation import RiskPropagator import logging @@ -11,211 +12,228 @@ logger = logging.getLogger(__name__) class RiskScorer: - def __init__(self, config: RiskBaselineConfig): + def __init__( + self, + config: RiskBaselineConfig, + repo_path: Optional[str] = None, + coverage_report: Optional[str] = None + ): self.config = config - self.git_miner = GitMiner() - self.coverage_scorer = None # Lazy load or passed in + self.git_miner = GitMiner(repo_path) + self.coverage_parser = CoverageParser(coverage_report) if coverage_report else None + + # Risk Propagator (Legacy/Existing component usage) self.risk_propagator = RiskPropagator( attenuation_factor=config.propagation_factor, max_iterations=config.propagation_iterations ) - def set_coverage_report(self, coverage_file: str): - self.coverage_scorer = CoverageScorer(coverage_file) - self.coverage_scorer.parse() - def _calculate_static_score(self, metrics: FileMetrics) -> float: + """ + Calculates static complexity score (0-10). + """ + # Original logic used specific weights and returned 0-1. + # We need to adapt it to return 0-10 or use the original 0-1 and scale. + python_metrics = metrics.language_specific.get("python", {}) - # Use existing logic or simplified logic? - # Using existing logic for now + # Extract metrics max_cc = python_metrics.get("max_cyclomatic_complexity", 0) avg_cc = python_metrics.get("avg_cyclomatic_complexity", 0.0) fan_out = python_metrics.get("fan_out", 0) - norm_max_cc = min(max_cc / self.config.threshold_complexity_high, 1.0) - norm_avg_cc = min(avg_cc / self.config.threshold_complexity_high, 1.0) - norm_fan_out = min(fan_out / 20, 1.0) - norm_loc = min(metrics.lines_of_code / 1000, 1.0) - - static_score = ( - self.config.weight_complexity_max * norm_max_cc + - self.config.weight_complexity_avg * norm_avg_cc + - self.config.weight_fan_out * norm_fan_out + - self.config.weight_loc * norm_loc + # Normalize based on thresholds (simple scaling) + # Assuming high complexity starts around 10-15 + norm_max_cc = min(max_cc / 15.0, 1.0) + norm_avg_cc = min(avg_cc / 5.0, 1.0) + norm_fan_out = min(fan_out / 20.0, 1.0) + + # Weighted sum for complexity + # Weights: max_cc 50%, avg_cc 30%, fan_out 20% + complexity_score = ( + 0.5 * norm_max_cc + + 0.3 * norm_avg_cc + + 0.2 * norm_fan_out ) - return min(static_score, 1.0) - def _calculate_churn_score(self, file_path: str) -> float: - churn = self.git_miner.get_file_churn(file_path, since_days=self.config.churn_since_days) - # Normalize - norm_churn = min(churn / self.config.threshold_churn_high, 1.0) - return norm_churn - - def _calculate_coverage_penalty(self, file_path: str) -> float: - if not self.coverage_scorer: - return 0.0 # No penalty if no coverage data + return complexity_score * 10.0 # Scale to 0-10 + + def _weighted_risk_model( + self, + complexity: float, # 0-10 + churn: float, # 0-10 + coverage: float, # 0-10 (Note: this is risk score from lack of coverage, so 10 = no coverage) + author_count: int, + file_lines: int + ) -> Dict: + """加权风险评分(对齐架构设计第 3.1.2 节) + + 公式: + Risk = w1·Complexity + w2·Churn + w3·(1-Coverage) + + w4·AuthorDiversity + w5·FileSize + """ + # Get weights from config + weights = { + "complexity": self.config.weight_complexity, + "churn": self.config.weight_churn, + "coverage": self.config.weight_coverage, + "author_diversity": self.config.weight_author_diversity, + "file_size": self.config.weight_file_size + } + + # Standardize author_count (0-10) + # 5+ authors = 10 points + author_score = min(10.0, author_count * 2.0) + + # Standardize file_lines (0-10) + # 1000 lines = 10 points + size_score = min(10.0, file_lines / 100.0) + + # Weighted sum + weighted_score = ( + weights["complexity"] * complexity + + weights["churn"] * churn + + weights["coverage"] * coverage + + weights["author_diversity"] * author_score + + weights["file_size"] * size_score + ) - coverage = self.coverage_scorer.get_coverage(file_path) - # Penalty is high if coverage is low. - # coverage is 0.0 to 1.0 (where 1.0 is full coverage) - return 1.0 - coverage + # Risk Level + if weighted_score >= 8.0: + level = "CRITICAL" + elif weighted_score >= 6.0: + level = "HIGH" + elif weighted_score >= 4.0: + level = "MEDIUM" + else: + level = "LOW" + + return { + "risk_score": round(weighted_score, 2), + "risk_level": level, + "breakdown": { + "complexity": round(complexity, 2), + "churn": round(churn, 2), + "coverage": round(coverage, 2), + "author_diversity": round(author_score, 2), + "file_size": round(size_score, 2) + } + } def score_project(self, snapshot: ProjectSnapshot) -> ProjectSnapshot: """ - Scores the entire project, updating file risks in place (or returning new ones). - Uses propagation. + Scores the entire project. """ file_risks: Dict[str, FileRisk] = {} base_scores: Dict[str, float] = {} - # 1. Calculate base scores (Static + Churn + Coverage) for file_snapshot in snapshot.files: file_path = file_snapshot.path metrics = file_snapshot.metrics or FileMetrics() - static_score = self._calculate_static_score(metrics) - churn_score = self._calculate_churn_score(file_path) - coverage_penalty = self._calculate_coverage_penalty(file_path) - - # Formula: - # Score = w_static * static + w_churn * churn + w_cov * (static * (1-Coverage)) - # Note: coverage penalty is applied to static score usually (if complex code is not covered, it's risky). - # The prompt says: "Score = w1 * Complexity + w2 * Churn + w3 * (1 - Coverage)" - # Wait, "w3 * (1 - Coverage)" implies standalone risk from lack of coverage regardless of complexity? - # But the prompt also said: "Coverage penalty amplifies static risk". - # Let's use the prompt formula: w1 * Complexity + w2 * Churn + w3 * (1 - Coverage) - # Complexity is static_score. - # (1-Coverage) is coverage_penalty. - - # Using weights from config - # But wait, weights in config are summing to > 1.0? - # weights for static components sum to 1.0 (0.4+0.3+0.2+0.1). - # So static_score is 0-1. - - # Now we combine them. - w_static = self.config.weight_static_score - w_churn = self.config.weight_churn - w_cov = self.config.weight_coverage_penalty - - # If I follow prompt strictly: w1, w2, w3. - # I will assume w1=w_static, w2=w_churn, w3=w_cov. - - # However, if code is simple (complexity 0) and not covered, is it risky? - # Maybe less risky. - # Let's implement: w1 * static + w2 * churn + w3 * (static * coverage_penalty) - # This aligns with "amplifies static risk". - - combined_score = ( - w_static * static_score + - w_churn * churn_score + - w_cov * (static_score * coverage_penalty) + # 1. Complexity (0-10) + complexity = self._calculate_static_score(metrics) + + # 2. Churn (0-10) + churn = 0.0 + author_count = 0 + if self.git_miner: + churn = self.git_miner.get_file_churn_score(file_path) + author_count = self.git_miner.get_file_author_count(file_path) + + # 3. Coverage (Risk Score 0-10) + # Coverage Ratio is 0.0-1.0 + # If report provided, use it. If no report provided, neutral risk (0.0). + # If report provided but file not found, assume 0% coverage (High Risk). + coverage_risk = 0.0 # Default if no report + + if self.coverage_parser: + cov_ratio = self.coverage_parser.get_file_coverage(file_path) + if cov_ratio is not None: + # Found in report + coverage_risk = (1.0 - cov_ratio) * 10.0 + else: + # Not found in report -> Assumed 0% coverage -> Max Risk + # BUT only if file is relevant code (not test, etc). + # For simplicity, if coverage parser is active but file missing, max risk. + # This aligns with "If cov_ratio is None: coverage_score = 10.0" from spec + coverage_risk = 10.0 + + # 4. File Size (Lines) + file_lines = metrics.lines_of_code + + # Calculate Risk + risk_result = self._weighted_risk_model( + complexity=complexity, + churn=churn, + coverage=coverage_risk, + author_count=author_count, + file_lines=file_lines ) - # Store for propagation - base_scores[file_path] = combined_score + risk_score = risk_result["risk_score"] + base_scores[file_path] = risk_score - # Store intermediate for detailed output - sub_scores = { - "static_score": round(static_score, 3), - "churn_score": round(churn_score, 3), - "coverage_penalty": round(coverage_penalty, 3), - "combined_base_score": round(combined_score, 3) - } + # Determine factors + factors = [] + breakdown = risk_result["breakdown"] + if breakdown["complexity"] > 6.0: factors.append("high_complexity") + if breakdown["churn"] > 6.0: factors.append("high_churn") + if breakdown["coverage"] > 8.0: factors.append("low_coverage") + if breakdown["author_diversity"] > 6.0: factors.append("many_authors") - # Temporary FileRisk (will be updated after propagation) - # We don't have level/factors yet fully determined. file_risks[file_path] = FileRisk( - risk_score=combined_score, - level="low", # placeholder - factors=[], - sub_scores=sub_scores + risk_score=risk_score, + level=risk_result["risk_level"].lower(), + factors=factors, + sub_scores=breakdown ) - # 2. Propagation - # Build dependency graph in format for propagator: Dict[str, List[str]] - # The snapshot has dependencies. + # Propagation (Optional: Apply on top of weighted score or integrate?) + # Architecture doc says propagation is important. + # We can apply propagation to the `risk_score`. + + # Build dependency graph dep_graph_dict = {} if snapshot.dependencies: - # dependency_graph.internal is List[Dict[str, str]] e.g. [{"source": "A", "target": "B"}]? - # Wait, `internal: List[Dict[str, str]]` description says "List of internal dependencies." - # Need to verify structure. Usually it is [{"source": ..., "target": ...}] or similar. - # Or maybe it's a list of dicts like [{"path": "...", "imports": [...]}]? - # Let's check `codesage/snapshot/models.py`. - # `internal: List[Dict[str, str]]`. - # Also `edges: List[Tuple[str, str]]`. - - # If edges is populated, use that. for src, dest in snapshot.dependencies.edges: if src not in dep_graph_dict: dep_graph_dict[src] = [] dep_graph_dict[src].append(dest) - final_scores = self.risk_propagator.propagate(dep_graph_dict, base_scores) + propagated_scores = self.risk_propagator.propagate(dep_graph_dict, base_scores) - # 3. Finalize + # Update scores with propagation for file_snapshot in snapshot.files: path = file_snapshot.path - score = final_scores.get(path, 0.0) + if path in file_risks: + original_risk = file_risks[path] + new_score = propagated_scores.get(path, original_risk.risk_score) - # Normalize to 0-1 if it exceeded - score = min(score, 1.0) # Or should we allow >1? Usually risk is 0-1 or 0-100. Let's cap at 1.0 (100%) + # Cap at 10.0 + new_score = min(10.0, new_score) - # Level - if score >= self.config.threshold_risk_high: - level = "high" - elif score >= self.config.threshold_risk_medium: - level = "medium" - else: - level = "low" + # Update level if score increased significantly + # (Simple logic for now) + if new_score >= 8.0: level = "critical" + elif new_score >= 6.0: level = "high" + elif new_score >= 4.0: level = "medium" + else: level = "low" - # Factors - factors = [] - risk_obj = file_risks.get(path) - sub_scores = risk_obj.sub_scores if risk_obj else {} - - static_s = sub_scores.get("static_score", 0) - churn_s = sub_scores.get("churn_score", 0) - cov_p = sub_scores.get("coverage_penalty", 0) - base_s = sub_scores.get("combined_base_score", 0) + # Add propagation factor + if new_score > original_risk.risk_score + 0.5: + original_risk.factors.append("risk_propagated") - if static_s > 0.7: factors.append("high_complexity") - if churn_s > 0.7: factors.append("high_churn") - if cov_p > 0.5 and static_s > 0.3: factors.append("low_coverage_complex") - if (score - base_s) > 0.2: factors.append("risk_propagated") + original_risk.risk_score = round(new_score, 2) + original_risk.level = level + original_risk.sub_scores["propagated_score"] = round(new_score, 2) - sub_scores["final_score"] = round(score, 3) - sub_scores["propagation_impact"] = round(score - base_s, 3) - - file_snapshot.risk = FileRisk( - risk_score=score, - level=level, - factors=factors, - sub_scores=sub_scores - ) - - # 4. Summarize Project Risk - snapshot.risk_summary = summarize_project_risk({f.path: f.risk for f in snapshot.files if f.risk}) + file_snapshot.risk = original_risk + # Summary + snapshot.risk_summary = summarize_project_risk(file_risks) return snapshot -# Backwards compatibility wrapper -def score_file_risk(metrics: FileMetrics, config: RiskBaselineConfig) -> FileRisk: - """Legacy function for single file scoring without context.""" - scorer = RiskScorer(config) - # Create a dummy score - static = scorer._calculate_static_score(metrics) - level = "low" - if static >= config.threshold_risk_high: level = "high" - elif static >= config.threshold_risk_medium: level = "medium" - - return FileRisk( - risk_score=static, - level=level, - factors=["static_analysis_only"], - sub_scores={"static_score": static} - ) - def summarize_project_risk(file_risks: Dict[str, FileRisk]) -> ProjectRiskSummary: """Summarizes the risk for the entire project.""" if not file_risks: @@ -229,7 +247,7 @@ def summarize_project_risk(file_risks: Dict[str, FileRisk]) -> ProjectRiskSummar total_risk = sum(r.risk_score for r in file_risks.values()) avg_risk = total_risk / len(file_risks) - high_risk_files = sum(1 for r in file_risks.values() if r.level == "high") + high_risk_files = sum(1 for r in file_risks.values() if r.level in ["high", "critical"]) medium_risk_files = sum(1 for r in file_risks.values() if r.level == "medium") low_risk_files = sum(1 for r in file_risks.values() if r.level == "low") diff --git a/codesage/rules/jules_specific_rules.py b/codesage/rules/jules_specific_rules.py new file mode 100644 index 0000000..32c21df --- /dev/null +++ b/codesage/rules/jules_specific_rules.py @@ -0,0 +1,318 @@ +"""Jules LLM 代码生成的特定反模式检测 +基于实际使用经验沉淀的规则集 +""" +import ast +import re +from typing import Optional, List, Any +from codesage.rules.base import BaseRule, RuleContext +from codesage.snapshot.models import Issue, FileSnapshot + +# Adapter class to bridge old Rule interface if needed or use BaseRule +# The existing code seems to use BaseRule. +# My implementations used a simpler interface: check(self, snapshot: FileSnapshot) +# I need to adapt them to match `check(self, ctx: RuleContext)` + +class JulesRule(BaseRule): + """Base class for Jules-specific rules simplifying access""" + rule_id: str = "jules-base" + description: str = "Base Jules Rule" + + # We need to define these as fields for Pydantic if BaseRule inherits from BaseModel? + # Checking BaseRule in base.py: inherits from ABC. Not Pydantic model. + # But it has type annotations. + + # Actually BaseRule is abstract. + # Let's override check. + + def check(self, ctx: RuleContext) -> List[Issue]: + return self.check_file(ctx.file) + + def check_file(self, snapshot: FileSnapshot) -> List[Issue]: + raise NotImplementedError + +class IncompleteErrorHandling(JulesRule): + """检测 LLM 生成代码中常见的"半成品异常处理" + """ + rule_id = "jules-001" + description = "Empty exception handler (common LLM artifact)" + severity = "HIGH" # Not part of BaseRule interface directly but used in logic + + def check_file(self, snapshot: FileSnapshot) -> List[Issue]: + issues = [] + if snapshot.language != "python": + return issues + + try: + tree = ast.parse(snapshot.content) + for node in ast.walk(tree): + if isinstance(node, ast.ExceptHandler): + if len(node.body) == 1: + child = node.body[0] + if isinstance(child, (ast.Pass, ast.Ellipsis)): + issues.append(Issue( + rule_id=self.rule_id, + severity="error", # Mapped from HIGH + message=self.description, + location={"file_path": snapshot.path, "line": node.lineno}, + symbol=None, + tags=["jules-pattern"] + )) + except Exception: + pass + return issues + +class MagicNumbersInConfig(JulesRule): + """检测硬编码的配置值(LLM 常忘记参数化) + """ + rule_id = "jules-002" + description = "Hardcoded configuration value" + + def check_file(self, snapshot: FileSnapshot) -> List[Issue]: + issues = [] + if snapshot.language != "python": + return issues + + try: + tree = ast.parse(snapshot.content) + for node in ast.walk(tree): + if isinstance(node, ast.Assign): + for target in node.targets: + if isinstance(target, ast.Name): + name = target.id.lower() + if any(k in name for k in ['timeout', 'retries', 'limit', 'threshold']): + if isinstance(node.value, ast.Constant) and isinstance(node.value.value, (int, float)): + issues.append(Issue( + rule_id=self.rule_id, + severity="warning", + message=f"Hardcoded configuration value '{target.id}'", + location={"file_path": snapshot.path, "line": node.lineno}, + symbol=target.id, + tags=["jules-pattern"] + )) + except Exception: + pass + return issues + +class InconsistentNamingConvention(JulesRule): + """检测 LLM 生成代码的命名风格不一致 + """ + rule_id = "jules-003" + description = "Mixed naming conventions detected" + + def check_file(self, snapshot: FileSnapshot) -> List[Issue]: + issues = [] + if snapshot.language != "python": + return issues + + try: + tree = ast.parse(snapshot.content) + snake_case_count = 0 + camel_case_count = 0 + + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef): + name = node.name + if not name.startswith('_'): + if name.islower() and '_' in name: + snake_case_count += 1 + elif name != name.lower() and '_' not in name: + camel_case_count += 1 + + if snake_case_count > 0 and camel_case_count > 0: + issues.append(Issue( + rule_id=self.rule_id, + severity="info", + message=f"Mixed naming conventions detected (snake_case: {snake_case_count}, camelCase: {camel_case_count})", + location={"file_path": snapshot.path, "line": 1}, + symbol=None, + tags=["jules-pattern"] + )) + except Exception: + pass + return issues + +class LongFunctionRule(JulesRule): + """检测 LLM 生成的过长函数""" + rule_id = "jules-004" + description = "Function is too long" + + def check_file(self, snapshot: FileSnapshot) -> List[Issue]: + issues = [] + if snapshot.language != "python": return issues + try: + tree = ast.parse(snapshot.content) + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef): + length = node.end_lineno - node.lineno + if length > 50: + issues.append(Issue( + rule_id=self.rule_id, + severity="warning", + message=f"Function '{node.name}' is too long ({length} lines)", + location={"file_path": snapshot.path, "line": node.lineno}, + symbol=node.name, + tags=["jules-pattern"] + )) + except Exception: pass + return issues + +class TODOLeftoverRule(JulesRule): + """检测 LLM 留下的 TODO 注释""" + rule_id = "jules-005" + description = "Found TODO comment" + + def check_file(self, snapshot: FileSnapshot) -> List[Issue]: + issues = [] + if not snapshot.content: return issues + lines = snapshot.content.splitlines() + for i, line in enumerate(lines): + if "TODO" in line: + issues.append(Issue( + rule_id=self.rule_id, + severity="info", + message="Found TODO comment", + location={"file_path": snapshot.path, "line": i+1}, + symbol=None, + tags=["jules-pattern"] + )) + return issues + +class HardcodedPathRule(JulesRule): + """检测硬编码的文件路径""" + rule_id = "jules-006" + description = "Possible hardcoded path detected" + + def check_file(self, snapshot: FileSnapshot) -> List[Issue]: + issues = [] + if snapshot.language != "python": return issues + try: + tree = ast.parse(snapshot.content) + for node in ast.walk(tree): + if isinstance(node, ast.Constant) and isinstance(node.value, str): + val = node.value + if (val.startswith("/") or "C:\\" in val) and len(val) > 3: + issues.append(Issue( + rule_id=self.rule_id, + severity="warning", + message=f"Possible hardcoded path detected: '{val}'", + location={"file_path": snapshot.path, "line": node.lineno}, + symbol=None, + tags=["jules-pattern"] + )) + except Exception: pass + return issues + +class PrintStatementRule(JulesRule): + """检测遗留的 print 调试语句""" + rule_id = "jules-007" + description = "Use of print() detected" + + def check_file(self, snapshot: FileSnapshot) -> List[Issue]: + issues = [] + if snapshot.language != "python": return issues + try: + tree = ast.parse(snapshot.content) + for node in ast.walk(tree): + if isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == "print": + issues.append(Issue( + rule_id=self.rule_id, + severity="info", + message="Use of print() detected", + location={"file_path": snapshot.path, "line": node.lineno}, + symbol="print", + tags=["jules-pattern"] + )) + except Exception: pass + return issues + +class BroadExceptionRule(JulesRule): + """检测捕获所有异常 (Exception) 而不记录""" + rule_id = "jules-008" + description = "Broad exception caught without logging" + + def check_file(self, snapshot: FileSnapshot) -> List[Issue]: + issues = [] + if snapshot.language != "python": return issues + try: + tree = ast.parse(snapshot.content) + for node in ast.walk(tree): + if isinstance(node, ast.ExceptHandler): + if node.type is None or (isinstance(node.type, ast.Name) and node.type.id == "Exception"): + # Check if logged or re-raised + has_logging = False + for child in node.body: + if isinstance(child, ast.Raise): has_logging = True + if isinstance(child, ast.Call) and hasattr(child.func, 'attr') and child.func.attr in ['error', 'exception']: has_logging = True + + if not has_logging: + issues.append(Issue( + rule_id=self.rule_id, + severity="warning", + message="Broad exception caught without logging or re-raising", + location={"file_path": snapshot.path, "line": node.lineno}, + symbol=None, + tags=["jules-pattern"] + )) + except Exception: pass + return issues + +class PlaceholderFunctionRule(JulesRule): + """检测占位符函数 (pass)""" + rule_id = "jules-009" + description = "Placeholder function detected" + + def check_file(self, snapshot: FileSnapshot) -> List[Issue]: + issues = [] + if snapshot.language != "python": return issues + try: + tree = ast.parse(snapshot.content) + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef): + if len(node.body) == 1 and isinstance(node.body[0], ast.Pass): + issues.append(Issue( + rule_id=self.rule_id, + severity="info", + message=f"Placeholder function '{node.name}'", + location={"file_path": snapshot.path, "line": node.lineno}, + symbol=node.name, + tags=["jules-pattern"] + )) + except Exception: pass + return issues + +class MissingDocstringRule(JulesRule): + """检测缺少文档字符串的函数""" + rule_id = "jules-010" + description = "Missing docstring" + + def check_file(self, snapshot: FileSnapshot) -> List[Issue]: + issues = [] + if snapshot.language != "python": return issues + try: + tree = ast.parse(snapshot.content) + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef): + if not ast.get_docstring(node) and not node.name.startswith('_'): + issues.append(Issue( + rule_id=self.rule_id, + severity="info", + message=f"Missing docstring for '{node.name}'", + location={"file_path": snapshot.path, "line": node.lineno}, + symbol=node.name, + tags=["jules-pattern"] + )) + except Exception: pass + return issues + +JULES_RULESET = [ + IncompleteErrorHandling(), + MagicNumbersInConfig(), + InconsistentNamingConvention(), + LongFunctionRule(), + TODOLeftoverRule(), + HardcodedPathRule(), + PrintStatementRule(), + BroadExceptionRule(), + PlaceholderFunctionRule(), + MissingDocstringRule() +] diff --git a/codesage/snapshot/models.py b/codesage/snapshot/models.py index 2920e0b..ac38b3e 100644 --- a/codesage/snapshot/models.py +++ b/codesage/snapshot/models.py @@ -44,7 +44,7 @@ class FileMetrics(BaseModel): class FileRisk(BaseModel): risk_score: float = Field(..., description="The calculated risk score (0-1).") - level: Literal["low", "medium", "high"] = Field(..., description="The risk level.") + level: Literal["low", "medium", "high", "critical"] = Field(..., description="The risk level.") factors: List[str] = Field(default_factory=list, description="Factors contributing to the risk score.") sub_scores: Dict[str, float] = Field(default_factory=dict, description="Detailed scores for each risk dimension.") @@ -97,6 +97,8 @@ class LLMCallStats(BaseModel): class FileSnapshot(BaseModel): path: str = Field(..., description="The relative path to the file.") language: str = Field(..., description="The programming language of the file.") + content: Optional[str] = Field(None, description="The content of the file. Required for deep analysis.") + size: Optional[int] = Field(None, description="The size of the file in bytes.") metrics: Optional[FileMetrics] = Field(None, description="A summary of the file's metrics.") symbols: Optional[Dict[str, Any]] = Field(default_factory=dict, description="A dictionary of symbols defined in the file.") risk: Optional[FileRisk] = Field(None, description="Risk assessment for the file.") diff --git a/codesage/test/coverage_parser.py b/codesage/test/coverage_parser.py new file mode 100644 index 0000000..13fc1e9 --- /dev/null +++ b/codesage/test/coverage_parser.py @@ -0,0 +1,220 @@ +"""测试覆盖率解析器 +支持多种覆盖率报告格式(对齐 Jules 生态) +""" +import xml.etree.ElementTree as ET +from pathlib import Path +from typing import Dict, List, Optional +import logging +import os + +logger = logging.getLogger(__name__) + +class CoverageParser: + """覆盖率数据解析器 + + 支持格式: + - Cobertura XML (Python/Java 主流) + - JaCoCo XML (Java) + - LCOV (JavaScript/Go) - Optional for now + - Golang cover profile (Go) + """ + + def __init__(self, report_path: str): + self.report_path = report_path + self._coverage_cache: Dict[str, float] = {} + self._parsed = False + + if report_path and os.path.exists(report_path): + self._parse() + + def _parse(self): + """Auto-detect and parse the report""" + if not self.report_path: + return + + try: + # Simple heuristic: if ends with .xml, try xml parsers. + if self.report_path.endswith('.xml'): + tree = ET.parse(self.report_path) + root = tree.getroot() + if root.tag == 'coverage': # Cobertura + # Relaxed check: Cobertura usually has packages or sources + if 'line-rate' in root.attrib or root.find('packages') is not None or root.find('sources') is not None: + self._coverage_cache = self.parse_cobertura(self.report_path) + elif root.tag == 'report': # JaCoCo + self._coverage_cache = self.parse_jacoco(self.report_path) + # Check for Go cover profile (first line usually "mode: set|count|atomic") + else: + with open(self.report_path, 'r') as f: + first_line = f.readline() + if first_line.startswith("mode:"): + self._coverage_cache = self.parse_go_cover(self.report_path) + except Exception as e: + logger.error(f"Failed to parse coverage report {self.report_path}: {e}") + + self._parsed = True + + def parse_go_cover(self, file_path: str) -> Dict[str, float]: + """解析 Golang cover profile format + Format: name.go:line.col,line.col num-stmt count + Example: + mode: set + github.com/pkg/foo/bar.go:10.12,12.3 2 1 + """ + results = {} + file_stats = {} # file -> {statements: int, covered: int} + + try: + with open(file_path, 'r') as f: + lines = f.readlines() + + for line in lines: + if line.startswith("mode:"): + continue + parts = line.split() + if len(parts) < 3: + continue + + # Format: file:start,end num-stmt count + file_segment = parts[0] + try: + stmts = int(parts[1]) + count = int(parts[2]) + except ValueError: + continue + + # Extract filename (everything before the last colon) + if ':' in file_segment: + filename = file_segment.rsplit(':', 1)[0] + else: + filename = file_segment + + if filename not in file_stats: + file_stats[filename] = {'total': 0, 'covered': 0} + + file_stats[filename]['total'] += stmts + if count > 0: + file_stats[filename]['covered'] += stmts + + for filename, stats in file_stats.items(): + if stats['total'] > 0: + results[filename] = stats['covered'] / stats['total'] + else: + results[filename] = 1.0 + + except Exception as e: + logger.error(f"Error parsing Go coverage: {e}") + + return results + + def parse_cobertura(self, xml_path: str) -> Dict[str, float]: + """解析 Cobertura XML 格式 + + 返回格式: + { + "src/engine.py": 0.85, # 85% 覆盖率 + "src/parser.py": 0.42, + ... + } + """ + results = {} + try: + tree = ET.parse(xml_path) + root = tree.getroot() + + # Cobertura structure: packages -> package -> classes -> class -> filename + for package in root.findall(".//package"): + for cls in package.findall(".//class"): + filename = cls.get("filename") + line_rate = cls.get("line-rate") + if filename and line_rate: + try: + results[filename] = float(line_rate) + except ValueError: + pass + + # Also handle if classes are direct children (some variants) + for cls in root.findall(".//class"): + filename = cls.get("filename") + line_rate = cls.get("line-rate") + if filename and line_rate: + try: + results[filename] = float(line_rate) + except ValueError: + pass + + except Exception as e: + logger.error(f"Error parsing Cobertura XML: {e}") + + return results + + def parse_jacoco(self, xml_path: str) -> Dict[str, float]: + """解析 JaCoCo XML 格式(Java 专用)""" + results = {} + try: + tree = ET.parse(xml_path) + root = tree.getroot() + + # JaCoCo structure: package -> sourcefile + for package in root.findall("package"): + pkg_name = package.get("name", "") + for sourcefile in package.findall("sourcefile"): + name = sourcefile.get("name") + if not name: + continue + + # Construct full path if possible, or just use filename? + # Usually report has relative paths. + # JaCoCo separates package name (slashes) and file name. + full_path = f"{pkg_name}/{name}" if pkg_name else name + + # Calculate coverage from counters + # + covered = 0 + missed = 0 + found_line_counter = False + for counter in sourcefile.findall("counter"): + if counter.get("type") == "LINE": + try: + covered = int(counter.get("covered", 0)) + missed = int(counter.get("missed", 0)) + found_line_counter = True + except ValueError: + pass + break + + if found_line_counter: + total = covered + missed + if total > 0: + results[full_path] = covered / total + else: + results[full_path] = 1.0 # Empty file? + + except Exception as e: + logger.error(f"Error parsing JaCoCo XML: {e}") + + return results + + def get_file_coverage(self, file_path: str) -> Optional[float]: + """查询单个文件的覆盖率(0.0 - 1.0) + + 未覆盖返回 None(与"覆盖率为 0"区分) + """ + # File paths in report might be relative or absolute. + # We try to match end of path if exact match fails. + if file_path in self._coverage_cache: + return self._coverage_cache[file_path] + + # Fuzzy match: try to find if any key in cache ends with file_path (or vice versa) + # This is risky but common since reports might have different root. + # Ideally we normalize paths. + + for key, value in self._coverage_cache.items(): + if file_path.endswith(key) or key.endswith(file_path): + return value + + return None + + def get_uncovered_files(self) -> List[str]: + """列出完全无测试覆盖的文件(高风险)""" + return [f for f, cov in self._coverage_cache.items() if cov == 0.0] diff --git a/poetry.lock b/poetry.lock index e46b8fb..8e2f18d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -426,6 +426,21 @@ docs = ["furo (>=2024.8.6)", "sphinx (>=8.0.2)", "sphinx-autodoc-typehints (>=2. testing = ["covdefaults (>=2.3)", "coverage (>=7.6.1)", "diff-cover (>=9.2)", "pytest (>=8.3.3)", "pytest-asyncio (>=0.24)", "pytest-cov (>=5)", "pytest-mock (>=3.14)", "pytest-timeout (>=2.3.1)", "virtualenv (>=20.26.4)"] typing = ["typing-extensions (>=4.12.2) ; python_version < \"3.11\""] +[[package]] +name = "gitdb" +version = "4.0.12" +description = "Git Object Database" +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "gitdb-4.0.12-py3-none-any.whl", hash = "sha256:67073e15955400952c6565cc3e707c554a4eea2e428946f7a4c162fab9bd9bcf"}, + {file = "gitdb-4.0.12.tar.gz", hash = "sha256:5ef71f855d191a3326fcfbc0d5da835f26b13fbcba60c32c21091c349ffdb571"}, +] + +[package.dependencies] +smmap = ">=3.0.1,<6" + [[package]] name = "gitignore-parser" version = "0.1.13" @@ -437,6 +452,25 @@ files = [ {file = "gitignore_parser-0.1.13.tar.gz", hash = "sha256:c7e10c8190accb8ae57fb3711889e73a9c0dbc04d4222b91ace8a4bf64d2f746"}, ] +[[package]] +name = "gitpython" +version = "3.1.45" +description = "GitPython is a Python library used to interact with Git repositories" +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "gitpython-3.1.45-py3-none-any.whl", hash = "sha256:8908cb2e02fb3b93b7eb0f2827125cb699869470432cc885f019b8fd0fccff77"}, + {file = "gitpython-3.1.45.tar.gz", hash = "sha256:85b0ee964ceddf211c41b9f27a49086010a190fd8132a24e21f362a4b36a791c"}, +] + +[package.dependencies] +gitdb = ">=4.0.1,<5" + +[package.extras] +doc = ["sphinx (>=7.1.2,<7.2)", "sphinx-autodoc-typehints", "sphinx_rtd_theme"] +test = ["coverage[toml]", "ddt (>=1.1.1,!=1.4.3)", "mock ; python_version < \"3.8\"", "mypy", "pre-commit", "pytest (>=7.3.1)", "pytest-cov", "pytest-instafail", "pytest-mock", "pytest-sugar", "typing-extensions ; python_version < \"3.11\""] + [[package]] name = "greenlet" version = "3.2.4" @@ -1918,6 +1952,18 @@ files = [ {file = "ruff-0.14.5.tar.gz", hash = "sha256:8d3b48d7d8aad423d3137af7ab6c8b1e38e4de104800f0d596990f6ada1a9fc1"}, ] +[[package]] +name = "smmap" +version = "5.0.2" +description = "A pure Python implementation of a sliding window memory map manager" +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "smmap-5.0.2-py3-none-any.whl", hash = "sha256:b30115f0def7d7531d22a0fb6502488d879e75b260a9db4d0819cfb25403af5e"}, + {file = "smmap-5.0.2.tar.gz", hash = "sha256:26ea65a03958fa0c8a1c7e8c7a58fdc77221b8910f6be2131affade476898ad5"}, +] + [[package]] name = "sniffio" version = "1.3.1" @@ -2432,4 +2478,4 @@ files = [ [metadata] lock-version = "2.1" python-versions = ">=3.10,<4.0" -content-hash = "7d874d99d43c26a19a10a752823c89cc293c96f252bd926ef8e1e9183b31771c" +content-hash = "c67b0bca8c4f789270cf25600032697c58e6dd1475e7c895e211fcf6dec9d9d2" diff --git a/pyproject.toml b/pyproject.toml index 3d15c4a..f555640 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ playwright = "^1.56.0" sqlalchemy = "^2.0.44" alembic = "^1.17.2" tree-sitter-java = "^0.23.5" +gitpython = "^3.1.45" [tool.poetry.dev-dependencies] diff --git a/tests/integration/test_risk_integration.py b/tests/integration/test_risk_integration.py index c96d7ee..c0073da 100644 --- a/tests/integration/test_risk_integration.py +++ b/tests/integration/test_risk_integration.py @@ -1,103 +1,108 @@ -import unittest + +import pytest +from datetime import datetime from unittest.mock import MagicMock, patch + from codesage.risk.risk_scorer import RiskScorer from codesage.config.risk_baseline import RiskBaselineConfig -from codesage.snapshot.models import ProjectSnapshot, FileSnapshot, FileMetrics, SnapshotMetadata, DependencyGraph - -class TestRiskIntegration(unittest.TestCase): - def setUp(self): - self.config = RiskBaselineConfig() - - # Mock GitMiner - self.patcher_git = patch('codesage.risk.risk_scorer.GitMiner') - self.MockGitMiner = self.patcher_git.start() - self.mock_git_miner = self.MockGitMiner.return_value - - # Mock CoverageScorer - self.patcher_cov = patch('codesage.risk.risk_scorer.CoverageScorer') - self.MockCoverageScorer = self.patcher_cov.start() - self.mock_cov_scorer = self.MockCoverageScorer.return_value - - self.scorer = RiskScorer(self.config) - - # By default mock coverage returns 1.0 (full coverage) unless specified - self.mock_cov_scorer.get_coverage.return_value = 1.0 - - # By default churn is 0 - self.mock_git_miner.get_file_churn.return_value = 0 - - def tearDown(self): - self.patcher_git.stop() - self.patcher_cov.stop() - - def test_full_scoring(self): - # Create a snapshot with 3 files - # A (High Complexity, High Churn, Low Coverage) -> Risk should be very high - # B (Low Complexity, Low Churn, Full Coverage) - # C (Medium Complexity) - - metadata = SnapshotMetadata( - version="1", timestamp="2023-01-01", project_name="test", - file_count=3, total_size=100, tool_version="1.0", config_hash="abc" +from codesage.snapshot.models import ProjectSnapshot, FileSnapshot, FileMetrics, SnapshotMetadata, FileRisk + +@pytest.fixture +def mock_snapshot(): + meta = SnapshotMetadata( + version="v1", + timestamp=datetime.now(), + project_name="test_proj", + file_count=2, + total_size=100, + tool_version="1.0", + config_hash="abc" + ) + + file1 = FileSnapshot( + path="src/complex.py", + language="python", + content="def foo(): pass", + metrics=FileMetrics( + lines_of_code=200, + language_specific={ + "python": { + "max_cyclomatic_complexity": 20, # High + "avg_cyclomatic_complexity": 10.0, + "fan_out": 10 + } + } ) - - # File A: High risk - metrics_a = FileMetrics( - lines_of_code=2000, - language_specific={"python": {"max_cyclomatic_complexity": 20, "avg_cyclomatic_complexity": 10, "fan_out": 30}} + ) + + file2 = FileSnapshot( + path="src/simple.py", + language="python", + content="print('hello')", + metrics=FileMetrics( + lines_of_code=10, + language_specific={ + "python": { + "max_cyclomatic_complexity": 1, + "avg_cyclomatic_complexity": 1.0, + "fan_out": 0 + } + } ) - # Static Score A calculation: - # max_cc(20) > threshold(10) -> norm=1.0 * 0.4 = 0.4 - # avg_cc(10) > threshold(10) -> norm=1.0 * 0.3 = 0.3 - # fan_out(30) > 20 -> norm=1.0 * 0.2 = 0.2 - # loc(2000) > 1000 -> norm=1.0 * 0.1 = 0.1 - # Total Static A = 1.0 - - # Churn A: High - self.mock_git_miner.get_file_churn.side_effect = lambda f, **kwargs: 20 if f == "A" else 0 - - # Coverage A: Low (0.0) - self.mock_cov_scorer.get_coverage.side_effect = lambda f: 0.0 if f == "A" else 1.0 - - file_a = FileSnapshot(path="A", language="python", metrics=metrics_a) - - # File B: Low risk, but depends on A - metrics_b = FileMetrics(lines_of_code=10) - file_b = FileSnapshot(path="B", language="python", metrics=metrics_b) - - snapshot = ProjectSnapshot( - metadata=metadata, - files=[file_a, file_b], - dependencies=DependencyGraph(edges=[("B", "A")]) # B -> A - ) - - # Set coverage file to trigger scorer usage - self.scorer.set_coverage_report("dummy.xml") - - # Run scoring - result = self.scorer.score_project(snapshot) - - # Check A - res_a = next(f for f in result.files if f.path == "A") - # Ensure it has high risk - self.assertAlmostEqual(res_a.risk.risk_score, 1.0, delta=0.01) - self.assertIn("high_complexity", res_a.risk.factors) - - # Check B - res_b = next(f for f in result.files if f.path == "B") - # B should have propagated risk from A - self.assertAlmostEqual(res_b.risk.risk_score, 0.2, delta=0.01) - - # In risk_scorer.py: - # if (score - base_s) > 0.2: factors.append("risk_propagated") - # Base B is 0. - # Score B is 0.2. - # 0.2 > 0.2 is FALSE. - # So it won't have "risk_propagated". - # I should expect it if I lower threshold or increase risk. - - # Since 0.2 is not strictly greater than 0.2, factor is missing. - # I will change expectation or update logic to >=. - -if __name__ == '__main__': - unittest.main() + ) + + return ProjectSnapshot( + metadata=meta, + files=[file1, file2], + languages=["python"] + ) + +def test_risk_scorer_integration_static_only(mock_snapshot): + config = RiskBaselineConfig() + scorer = RiskScorer(config) + + scored_snapshot = scorer.score_project(mock_snapshot) + + f1 = next(f for f in scored_snapshot.files if f.path == "src/complex.py") + f2 = next(f for f in scored_snapshot.files if f.path == "src/simple.py") + + # Static score for f1 should be high because max_cc=20 + # In my logic: 0.5 * min(20/15, 1) + ... + # 0.5 * 1 + 0.3 * 1 + 0.2 * 0.5 = 0.9 * 10 = 9.0 complexity + + # Static score only contributed 30% to total risk (weight_complexity=0.3) + # Risk = 0.3 * 9.0 = 2.7. + # Plus file_size=200 lines -> 2.0. weight=0.1 -> 0.2 + # Total ~ 2.9 (Low) + + assert f1.risk.risk_score > f2.risk.risk_score + assert f1.risk.sub_scores["complexity"] > 5.0 + + # Ensure churn/coverage is 0 (as not provided) + assert f1.risk.sub_scores["churn"] == 0.0 + assert f1.risk.sub_scores["coverage"] == 0.0 + +@patch("codesage.git.miner.GitMiner.get_file_churn_score") +@patch("codesage.git.miner.GitMiner.get_file_author_count") +def test_risk_scorer_integration_with_churn(mock_author, mock_churn, mock_snapshot): + mock_churn.return_value = 10.0 # Max churn + mock_author.return_value = 5 # Max authors (5 -> 10 score) + + config = RiskBaselineConfig() + # Pass repo_path to trigger GitMiner usage (although we mocked methods, init needs path) + scorer = RiskScorer(config, repo_path=".") + + scored_snapshot = scorer.score_project(mock_snapshot) + f1 = next(f for f in scored_snapshot.files if f.path == "src/complex.py") + + # Components: + # Complexity: ~9.0 * 0.3 = 2.7 + # Churn: 10.0 * 0.25 = 2.5 + # Author: 10.0 * 0.1 = 1.0 + # Size: 2.0 * 0.1 = 0.2 + # Coverage: 0.0 + # Total: 2.7 + 2.5 + 1.0 + 0.2 = 6.4 (High) + + assert f1.risk.risk_score >= 6.0 + assert f1.risk.level in ["high", "critical"] + assert "high_churn" in f1.risk.factors diff --git a/tests/unit/git/test_miner.py b/tests/unit/git/test_miner.py new file mode 100644 index 0000000..1322ce8 --- /dev/null +++ b/tests/unit/git/test_miner.py @@ -0,0 +1,81 @@ + +import pytest +import os +from unittest.mock import MagicMock, patch +from codesage.git.miner import GitMiner +from datetime import datetime + +class TestGitMiner: + + @patch("codesage.git.miner.Repo") + def test_get_file_churn_score(self, mock_repo_class): + # Setup mock repo + mock_repo = MagicMock() + mock_repo_class.return_value = mock_repo + + # Mock commits + mock_commit1 = MagicMock() + mock_commit1.stats.files = {"test.py": 1} + mock_commit2 = MagicMock() + mock_commit2.stats.files = {"test.py": 1} + + mock_repo.iter_commits.return_value = [mock_commit1, mock_commit2] + + miner = GitMiner(".") + + # Test churn calculation + # 2 commits in 90 days. 90/30 = 3 months. + # Score = min(10, 2 / 3) = 0.67 + score = miner.get_file_churn_score("test.py", days=90) + assert score == 0.67 + + # Test max score + # Reset cache + miner._churn_cache = {} + miner._cache_initialized = False + + mock_repo.iter_commits.return_value = [MagicMock(stats=MagicMock(files={"test.py": 1}))] * 50 + score_high = miner.get_file_churn_score("test.py", days=90) + assert score_high == 10.0 + + @patch("codesage.git.miner.Repo") + def test_get_file_author_count(self, mock_repo_class): + mock_repo = MagicMock() + mock_repo_class.return_value = mock_repo + + c1 = MagicMock() + c1.author.email = "a@example.com" + c1.stats.files = {"test.py": 1} + c2 = MagicMock() + c2.author.email = "b@example.com" + c2.stats.files = {"test.py": 1} + c3 = MagicMock() + c3.author.email = "a@example.com" # Duplicate + c3.stats.files = {"test.py": 1} + + mock_repo.iter_commits.return_value = [c1, c2, c3] + + miner = GitMiner(".") + count = miner.get_file_author_count("test.py") + assert count == 2 + + @patch("codesage.git.miner.Repo") + def test_get_hotspot_files(self, mock_repo_class): + mock_repo = MagicMock() + mock_repo_class.return_value = mock_repo + + c1 = MagicMock() + c1.stats.files = {"file1.py": 1, "file2.py": 1} + c2 = MagicMock() + c2.stats.files = {"file1.py": 1} + + mock_repo.iter_commits.return_value = [c1, c2] + + miner = GitMiner(".") + hotspots = miner.get_hotspot_files(top_n=2) + + assert len(hotspots) == 2 + assert hotspots[0]["path"] == "file1.py" + assert hotspots[0]["commits"] == 2 + assert hotspots[1]["path"] == "file2.py" + assert hotspots[1]["commits"] == 1 diff --git a/tests/unit/rules/test_jules_rules.py b/tests/unit/rules/test_jules_rules.py new file mode 100644 index 0000000..41c8ef6 --- /dev/null +++ b/tests/unit/rules/test_jules_rules.py @@ -0,0 +1,39 @@ + +import pytest +from codesage.rules.jules_specific_rules import JULES_RULESET, IncompleteErrorHandling, MagicNumbersInConfig +from codesage.snapshot.models import FileSnapshot, Issue +from codesage.rules.base import RuleContext + +class TestJulesRules: + + def test_incomplete_error_handling(self): + code = """ +try: + foo() +except Exception: + pass +""" + snapshot = FileSnapshot(path="test.py", content=code, language="python", size=len(code)) + rule = IncompleteErrorHandling() + # Mock context if possible, or just call check_file directly since we added that method. + # But `check` expects `RuleContext`. + # We can bypass `check` and use `check_file` for testing logic. + issues = rule.check_file(snapshot) + assert len(issues) == 1 + assert issues[0].rule_id == "jules-001" + + def test_magic_numbers(self): + code = """ +timeout = 30 +MAX_RETRIES = 5 +""" + snapshot = FileSnapshot(path="config.py", content=code, language="python", size=len(code)) + rule = MagicNumbersInConfig() + issues = rule.check_file(snapshot) + # timeout=30 matches 'timeout'. + # MAX_RETRIES=5 matches 'retries' in lower case. + # So expected 2 issues. + assert len(issues) >= 1 + + def test_ruleset_completeness(self): + assert len(JULES_RULESET) >= 10 diff --git a/tests/unit/test_parser/test_coverage_parser.py b/tests/unit/test_parser/test_coverage_parser.py new file mode 100644 index 0000000..13ec019 --- /dev/null +++ b/tests/unit/test_parser/test_coverage_parser.py @@ -0,0 +1,85 @@ + +import pytest +import os +import xml.etree.ElementTree as ET +from codesage.test.coverage_parser import CoverageParser + +class TestCoverageParser: + + def test_parse_cobertura(self, tmp_path): + xml_content = """ + + + + + + + + + + + """ + f = tmp_path / "coverage.xml" + f.write_text(xml_content) + + parser = CoverageParser(str(f)) + + assert parser.get_file_coverage("src/foo.py") == 0.8 + assert parser.get_file_coverage("src/bar.py") == 0.5 + assert parser.get_file_coverage("unknown.py") is None + + def test_parse_jacoco(self, tmp_path): + xml_content = """ + + + + + + + + + """ + f = tmp_path / "jacoco.xml" + f.write_text(xml_content) + + parser = CoverageParser(str(f)) + + # Covered 5, Missed 5 => Total 10 => 0.5 + assert parser.get_file_coverage("com/example/Main.java") == 0.5 + + def test_parse_go_cover(self, tmp_path): + content = """mode: set +github.com/pkg/foo/bar.go:10.12,12.3 2 1 +github.com/pkg/foo/baz.go:5.1,6.1 10 0 +""" + f = tmp_path / "coverage.out" + f.write_text(content) + + parser = CoverageParser(str(f)) + + # bar.go: 2 stmts, covered + assert parser.get_file_coverage("github.com/pkg/foo/bar.go") == 1.0 + # baz.go: 10 stmts, not covered + assert parser.get_file_coverage("github.com/pkg/foo/baz.go") == 0.0 + + def test_get_uncovered_files(self, tmp_path): + xml_content = """ + + + + + + + + + + + """ + f = tmp_path / "coverage.xml" + f.write_text(xml_content) + + parser = CoverageParser(str(f)) + uncovered = parser.get_uncovered_files() + + assert "uncovered.py" in uncovered + assert "covered.py" not in uncovered