diff --git a/codesage/cli/commands/scan.py b/codesage/cli/commands/scan.py
index d56fb00..8ff4e73 100644
--- a/codesage/cli/commands/scan.py
+++ b/codesage/cli/commands/scan.py
@@ -13,6 +13,10 @@
from codesage.cli.plugin_loader import PluginManager
from codesage.history.store import StorageEngine
from codesage.core.interfaces import CodeIssue
+from codesage.risk.risk_scorer import RiskScorer
+from codesage.config.risk_baseline import RiskBaselineConfig
+from codesage.rules.jules_specific_rules import JULES_RULESET
+from codesage.rules.base import RuleContext
from datetime import datetime, timezone
def get_builder(language: str, path: Path):
@@ -144,8 +148,10 @@ def merge_snapshots(snapshots: List[ProjectSnapshot], project_name: str) -> Proj
@click.option('--ci-mode', is_flag=True, help='Enable CI mode (auto-detect GitHub environment).')
@click.option('--plugins-dir', default='.codesage/plugins', help='Directory containing plugins.')
@click.option('--db-url', default='sqlite:///codesage.db', help='Database URL for storage.')
+@click.option('--git-repo', type=click.Path(), help='Git 仓库路径(用于变更历史分析)')
+@click.option('--coverage-report', type=click.Path(), help='覆盖率报告路径(Cobertura/JaCoCo XML)')
@click.pass_context
-def scan(ctx, path, language, reporter, output, fail_on_high, ci_mode, plugins_dir, db_url):
+def scan(ctx, path, language, reporter, output, fail_on_high, ci_mode, plugins_dir, db_url, git_repo, coverage_report):
"""
Scan the codebase and report issues.
"""
@@ -205,16 +211,73 @@ def scan(ctx, path, language, reporter, output, fail_on_high, ci_mode, plugins_d
click.echo(f"Failed to merge snapshots: {e}", err=True)
ctx.exit(1)
- # 3. Apply Custom Rules (Plugins)
+ # Populate file contents if missing (needed for rules)
+ click.echo("Populating file contents...")
+ for file_snapshot in snapshot.files:
+ if not file_snapshot.content:
+ try:
+ full_path = root_path / file_snapshot.path
+ if full_path.exists():
+ file_snapshot.content = full_path.read_text(errors='ignore')
+ # Update size if missing
+ if file_snapshot.size is None:
+ file_snapshot.size = len(file_snapshot.content)
+ except Exception as e:
+ # logger.warning(f"Failed to read file {file_snapshot.path}: {e}")
+ pass
+
+ # 3. Apply Risk Scoring (Enhanced in Phase 1)
+ try:
+ risk_config = RiskBaselineConfig() # Load default config
+ scorer = RiskScorer(
+ config=risk_config,
+ repo_path=git_repo or path, # Default to scanned path if not specified
+ coverage_report=coverage_report
+ )
+ snapshot = scorer.score_project(snapshot)
+ except Exception as e:
+ click.echo(f"Warning: Risk scoring failed: {e}", err=True)
+
+ # 4. Apply Custom Rules (Plugins & Jules Rules)
+
+ # Create RuleContext
+ # We need a dummy config for now as RuleContext expects one, but JulesRules might not use it.
+ # However, PythonRulesetBaselineConfig is expected by RuleContext definition in base.py.
+ # We need to import it or mock it.
+ from codesage.config.rules_python_baseline import RulesPythonBaselineConfig
+ rule_config = RulesPythonBaselineConfig() # Default config
+
+ # Apply Jules Specific Rules
+ click.echo("Applying Jules-specific rules...")
+ for rule in JULES_RULESET:
+ for file_snapshot in snapshot.files:
+ try:
+ # Create context for this file
+ rule_ctx = RuleContext(
+ project=snapshot,
+ file=file_snapshot,
+ config=rule_config
+ )
+
+ # Call rule.check(ctx)
+ # Ensure rule supports check(ctx)
+ issues = rule.check(rule_ctx)
+
+ if issues:
+ if file_snapshot.issues is None:
+ file_snapshot.issues = []
+ file_snapshot.issues.extend(issues)
+ except Exception as e:
+ click.echo(f"Error applying rule {rule.rule_id} to {file_snapshot.path}: {e}", err=True)
+
+ # Apply Plugin Rules
for rule in plugin_manager.rules:
# Ensure we iterate over the list of files
for file_snapshot in snapshot.files:
file_path = Path(file_snapshot.path)
try:
- content = ""
- full_path = root_path / file_path
- if full_path.exists():
- content = full_path.read_text(errors='ignore')
+ # Content is already populated now
+ content = file_snapshot.content or ""
issues = rule.check(str(file_path), content, {})
if issues:
@@ -249,29 +312,33 @@ def scan(ctx, path, language, reporter, output, fail_on_high, ci_mode, plugins_d
except Exception as e:
click.echo(f"Error running rule {rule.id} on {file_path}: {e}", err=True)
- # Recalculate Issues Summary after Plugins
- # Simplified recalculation
+ # Recalculate Issues Summary after Plugins & Jules Rules
total_issues = 0
by_severity = {}
+ by_rule = {}
for f in snapshot.files:
if f.issues:
total_issues += len(f.issues)
for issue in f.issues:
by_severity[issue.severity] = by_severity.get(issue.severity, 0) + 1
+ if issue.rule_id:
+ by_rule[issue.rule_id] = by_rule.get(issue.rule_id, 0) + 1
# Update snapshot summary if issues changed
if snapshot.issues_summary:
snapshot.issues_summary.total_issues = total_issues
snapshot.issues_summary.by_severity = by_severity
+ snapshot.issues_summary.by_rule = by_rule
else:
snapshot.issues_summary = ProjectIssuesSummary(
total_issues=total_issues,
- by_severity=by_severity
+ by_severity=by_severity,
+ by_rule=by_rule
)
- # 4. Save to Storage
+ # 5. Save to Storage
if storage:
try:
storage.save_snapshot(snapshot.metadata.project_name, snapshot)
diff --git a/codesage/config/risk_baseline.py b/codesage/config/risk_baseline.py
index f26fe32..4930871 100644
--- a/codesage/config/risk_baseline.py
+++ b/codesage/config/risk_baseline.py
@@ -1,4 +1,4 @@
-from pydantic import BaseModel
+from pydantic import BaseModel, Field
class RiskBaselineConfig(BaseModel):
"""Configuration for the baseline risk scorer."""
@@ -9,13 +9,16 @@ class RiskBaselineConfig(BaseModel):
weight_fan_out: float = 0.2
weight_loc: float = 0.1
- # Weights for multi-dimensional scoring
- # Final = w_static * static + w_churn * churn + w_cov * (static * (1-cov))
- # Or as per task: Score = w1 * Complexity + w2 * Churn + w3 * (1 - Coverage)
- # The "Complexity" here refers to the static score calculated above.
+ # Weights for multi-dimensional scoring (New Model)
+ # Risk = w1·Complexity + w2·Churn + w3·(1-Coverage) + w4·AuthorDiversity + w5·FileSize
+ weight_complexity: float = Field(default=0.30, description="Weight for complexity score")
+ weight_churn: float = Field(default=0.25, description="Weight for git churn score")
+ weight_coverage: float = Field(default=0.25, description="Weight for coverage risk")
+ weight_author_diversity: float = Field(default=0.10, description="Weight for author diversity")
+ weight_file_size: float = Field(default=0.10, description="Weight for file size (LOC)")
+ # Legacy weights (kept for backward compatibility if needed, but new model supersedes)
weight_static_score: float = 0.5
- weight_churn: float = 0.3
weight_coverage_penalty: float = 0.2
# Propagation
@@ -29,7 +32,7 @@ class RiskBaselineConfig(BaseModel):
# Churn settings
churn_since_days: int = 90
- threshold_churn_high: int = 10 # If file changed > 10 times in 90 days, normalized churn = 1.0
+ threshold_churn_high: int = 10
@classmethod
def from_defaults(cls) -> "RiskBaselineConfig":
diff --git a/codesage/git/miner.py b/codesage/git/miner.py
new file mode 100644
index 0000000..f85efd7
--- /dev/null
+++ b/codesage/git/miner.py
@@ -0,0 +1,121 @@
+"""Git 历史数据挖掘器
+实现架构设计第 3.1.3 节的"代码演化分析"能力
+"""
+from datetime import datetime, timedelta
+import logging
+from typing import Dict, List, Optional, Set
+import os
+
+try:
+ from git import Repo, InvalidGitRepositoryError
+except ImportError:
+ Repo = None
+ InvalidGitRepositoryError = None
+
+logger = logging.getLogger(__name__)
+
+class GitMiner:
+ """Git 历史挖掘器
+
+ 核心指标(对齐架构设计):
+ - 变更频率: 近 N 天内的提交次数
+ - 文件热度: 累计变更行数 / 文件总行数 (这里简化为变更次数,后续可扩展)
+ - 作者分散度: 不同作者数量(高分散度 = 高风险)
+ """
+
+ def __init__(self, repo_path: Optional[str] = None):
+ self.repo_path = repo_path or os.getcwd()
+ self.repo = None
+ self._churn_cache: Dict[str, int] = {}
+ self._author_cache: Dict[str, Set[str]] = {}
+ self._cache_initialized = False
+
+ if Repo:
+ try:
+ self.repo = Repo(self.repo_path, search_parent_directories=True)
+ except (InvalidGitRepositoryError, Exception) as e:
+ logger.warning(f"Failed to initialize Git repo at {self.repo_path}: {e}")
+
+ def _initialize_stats(self, days: int = 90):
+ """Bulk process commits to populate caches."""
+ if self._cache_initialized:
+ return
+
+ if not self.repo:
+ return
+
+ try:
+ since_date = datetime.now() - timedelta(days=days)
+ # Use traverse_commits for potentially faster iteration if supported, otherwise standard iteration
+ # Iterating over all commits once is O(N_commits * M_files_changed) which is better than O(F_files * N_commits)
+ commits = self.repo.iter_commits(since=since_date)
+
+ for commit in commits:
+ # stats.files returns dict {path: stats}
+ for file_path in commit.stats.files.keys():
+ self._churn_cache[file_path] = self._churn_cache.get(file_path, 0) + 1
+
+ if file_path not in self._author_cache:
+ self._author_cache[file_path] = set()
+ self._author_cache[file_path].add(commit.author.email)
+
+ self._cache_initialized = True
+ except Exception as e:
+ logger.error(f"Error initializing git stats: {e}")
+
+ def get_file_churn_score(self, file_path: str, days: int = 90) -> float:
+ """计算文件变更频率评分(0-10)
+
+ 算法: score = min(10, commit_count / (days / 30))
+ - 月均 1 次提交 = 1 分
+ - 月均 10 次提交 = 10 分(满分)
+ """
+ if not self.repo:
+ return 0.0
+
+ # Ensure cache is populated
+ self._initialize_stats(days)
+
+ # We need exact path match.
+ # Note: git paths are relative to repo root. `file_path` usually is relative too.
+ # But we might need normalization if `file_path` comes from different source.
+ # Assuming consistency for now.
+
+ commit_count = self._churn_cache.get(file_path, 0)
+
+ denominator = max(days / 30, 1) # avoid division by zero
+ score = min(10.0, commit_count / denominator)
+ return round(score, 2)
+
+ def get_file_author_count(self, file_path: str) -> int:
+ """统计文件的历史贡献者数量
+
+ 用于评估"维护一致性风险":
+ - 1 人维护: 低风险(知识集中)
+ - 5+ 人维护: 高风险(理解成本高)
+ """
+ if not self.repo:
+ return 0
+
+ self._initialize_stats()
+
+ authors = self._author_cache.get(file_path, set())
+ return len(authors)
+
+ def get_hotspot_files(self, top_n: int = 20) -> List[Dict]:
+ """识别代码热点(高频变更文件)
+ """
+ if not self.repo:
+ return []
+
+ self._initialize_stats()
+
+ sorted_files = sorted(self._churn_cache.items(), key=lambda x: x[1], reverse=True)[:top_n]
+
+ result = []
+ for path, count in sorted_files:
+ result.append({
+ "path": path,
+ "commits": count
+ })
+ return result
diff --git a/codesage/risk/risk_scorer.py b/codesage/risk/risk_scorer.py
index a69bc14..1753879 100644
--- a/codesage/risk/risk_scorer.py
+++ b/codesage/risk/risk_scorer.py
@@ -1,9 +1,10 @@
from typing import Dict, List, Optional
+import math
from codesage.config.risk_baseline import RiskBaselineConfig
from codesage.snapshot.models import FileMetrics, FileRisk, ProjectRiskSummary, ProjectSnapshot
-from codesage.history.git_miner import GitMiner
-from codesage.risk.scorers.coverage_scorer import CoverageScorer
+from codesage.git.miner import GitMiner
+from codesage.test.coverage_parser import CoverageParser
from codesage.risk.propagation import RiskPropagator
import logging
@@ -11,211 +12,228 @@
logger = logging.getLogger(__name__)
class RiskScorer:
- def __init__(self, config: RiskBaselineConfig):
+ def __init__(
+ self,
+ config: RiskBaselineConfig,
+ repo_path: Optional[str] = None,
+ coverage_report: Optional[str] = None
+ ):
self.config = config
- self.git_miner = GitMiner()
- self.coverage_scorer = None # Lazy load or passed in
+ self.git_miner = GitMiner(repo_path)
+ self.coverage_parser = CoverageParser(coverage_report) if coverage_report else None
+
+ # Risk Propagator (Legacy/Existing component usage)
self.risk_propagator = RiskPropagator(
attenuation_factor=config.propagation_factor,
max_iterations=config.propagation_iterations
)
- def set_coverage_report(self, coverage_file: str):
- self.coverage_scorer = CoverageScorer(coverage_file)
- self.coverage_scorer.parse()
-
def _calculate_static_score(self, metrics: FileMetrics) -> float:
+ """
+ Calculates static complexity score (0-10).
+ """
+ # Original logic used specific weights and returned 0-1.
+ # We need to adapt it to return 0-10 or use the original 0-1 and scale.
+
python_metrics = metrics.language_specific.get("python", {})
- # Use existing logic or simplified logic?
- # Using existing logic for now
+ # Extract metrics
max_cc = python_metrics.get("max_cyclomatic_complexity", 0)
avg_cc = python_metrics.get("avg_cyclomatic_complexity", 0.0)
fan_out = python_metrics.get("fan_out", 0)
- norm_max_cc = min(max_cc / self.config.threshold_complexity_high, 1.0)
- norm_avg_cc = min(avg_cc / self.config.threshold_complexity_high, 1.0)
- norm_fan_out = min(fan_out / 20, 1.0)
- norm_loc = min(metrics.lines_of_code / 1000, 1.0)
-
- static_score = (
- self.config.weight_complexity_max * norm_max_cc +
- self.config.weight_complexity_avg * norm_avg_cc +
- self.config.weight_fan_out * norm_fan_out +
- self.config.weight_loc * norm_loc
+ # Normalize based on thresholds (simple scaling)
+ # Assuming high complexity starts around 10-15
+ norm_max_cc = min(max_cc / 15.0, 1.0)
+ norm_avg_cc = min(avg_cc / 5.0, 1.0)
+ norm_fan_out = min(fan_out / 20.0, 1.0)
+
+ # Weighted sum for complexity
+ # Weights: max_cc 50%, avg_cc 30%, fan_out 20%
+ complexity_score = (
+ 0.5 * norm_max_cc +
+ 0.3 * norm_avg_cc +
+ 0.2 * norm_fan_out
)
- return min(static_score, 1.0)
- def _calculate_churn_score(self, file_path: str) -> float:
- churn = self.git_miner.get_file_churn(file_path, since_days=self.config.churn_since_days)
- # Normalize
- norm_churn = min(churn / self.config.threshold_churn_high, 1.0)
- return norm_churn
-
- def _calculate_coverage_penalty(self, file_path: str) -> float:
- if not self.coverage_scorer:
- return 0.0 # No penalty if no coverage data
+ return complexity_score * 10.0 # Scale to 0-10
+
+ def _weighted_risk_model(
+ self,
+ complexity: float, # 0-10
+ churn: float, # 0-10
+ coverage: float, # 0-10 (Note: this is risk score from lack of coverage, so 10 = no coverage)
+ author_count: int,
+ file_lines: int
+ ) -> Dict:
+ """加权风险评分(对齐架构设计第 3.1.2 节)
+
+ 公式:
+ Risk = w1·Complexity + w2·Churn + w3·(1-Coverage)
+ + w4·AuthorDiversity + w5·FileSize
+ """
+ # Get weights from config
+ weights = {
+ "complexity": self.config.weight_complexity,
+ "churn": self.config.weight_churn,
+ "coverage": self.config.weight_coverage,
+ "author_diversity": self.config.weight_author_diversity,
+ "file_size": self.config.weight_file_size
+ }
+
+ # Standardize author_count (0-10)
+ # 5+ authors = 10 points
+ author_score = min(10.0, author_count * 2.0)
+
+ # Standardize file_lines (0-10)
+ # 1000 lines = 10 points
+ size_score = min(10.0, file_lines / 100.0)
+
+ # Weighted sum
+ weighted_score = (
+ weights["complexity"] * complexity +
+ weights["churn"] * churn +
+ weights["coverage"] * coverage +
+ weights["author_diversity"] * author_score +
+ weights["file_size"] * size_score
+ )
- coverage = self.coverage_scorer.get_coverage(file_path)
- # Penalty is high if coverage is low.
- # coverage is 0.0 to 1.0 (where 1.0 is full coverage)
- return 1.0 - coverage
+ # Risk Level
+ if weighted_score >= 8.0:
+ level = "CRITICAL"
+ elif weighted_score >= 6.0:
+ level = "HIGH"
+ elif weighted_score >= 4.0:
+ level = "MEDIUM"
+ else:
+ level = "LOW"
+
+ return {
+ "risk_score": round(weighted_score, 2),
+ "risk_level": level,
+ "breakdown": {
+ "complexity": round(complexity, 2),
+ "churn": round(churn, 2),
+ "coverage": round(coverage, 2),
+ "author_diversity": round(author_score, 2),
+ "file_size": round(size_score, 2)
+ }
+ }
def score_project(self, snapshot: ProjectSnapshot) -> ProjectSnapshot:
"""
- Scores the entire project, updating file risks in place (or returning new ones).
- Uses propagation.
+ Scores the entire project.
"""
file_risks: Dict[str, FileRisk] = {}
base_scores: Dict[str, float] = {}
- # 1. Calculate base scores (Static + Churn + Coverage)
for file_snapshot in snapshot.files:
file_path = file_snapshot.path
metrics = file_snapshot.metrics or FileMetrics()
- static_score = self._calculate_static_score(metrics)
- churn_score = self._calculate_churn_score(file_path)
- coverage_penalty = self._calculate_coverage_penalty(file_path)
-
- # Formula:
- # Score = w_static * static + w_churn * churn + w_cov * (static * (1-Coverage))
- # Note: coverage penalty is applied to static score usually (if complex code is not covered, it's risky).
- # The prompt says: "Score = w1 * Complexity + w2 * Churn + w3 * (1 - Coverage)"
- # Wait, "w3 * (1 - Coverage)" implies standalone risk from lack of coverage regardless of complexity?
- # But the prompt also said: "Coverage penalty amplifies static risk".
- # Let's use the prompt formula: w1 * Complexity + w2 * Churn + w3 * (1 - Coverage)
- # Complexity is static_score.
- # (1-Coverage) is coverage_penalty.
-
- # Using weights from config
- # But wait, weights in config are summing to > 1.0?
- # weights for static components sum to 1.0 (0.4+0.3+0.2+0.1).
- # So static_score is 0-1.
-
- # Now we combine them.
- w_static = self.config.weight_static_score
- w_churn = self.config.weight_churn
- w_cov = self.config.weight_coverage_penalty
-
- # If I follow prompt strictly: w1, w2, w3.
- # I will assume w1=w_static, w2=w_churn, w3=w_cov.
-
- # However, if code is simple (complexity 0) and not covered, is it risky?
- # Maybe less risky.
- # Let's implement: w1 * static + w2 * churn + w3 * (static * coverage_penalty)
- # This aligns with "amplifies static risk".
-
- combined_score = (
- w_static * static_score +
- w_churn * churn_score +
- w_cov * (static_score * coverage_penalty)
+ # 1. Complexity (0-10)
+ complexity = self._calculate_static_score(metrics)
+
+ # 2. Churn (0-10)
+ churn = 0.0
+ author_count = 0
+ if self.git_miner:
+ churn = self.git_miner.get_file_churn_score(file_path)
+ author_count = self.git_miner.get_file_author_count(file_path)
+
+ # 3. Coverage (Risk Score 0-10)
+ # Coverage Ratio is 0.0-1.0
+ # If report provided, use it. If no report provided, neutral risk (0.0).
+ # If report provided but file not found, assume 0% coverage (High Risk).
+ coverage_risk = 0.0 # Default if no report
+
+ if self.coverage_parser:
+ cov_ratio = self.coverage_parser.get_file_coverage(file_path)
+ if cov_ratio is not None:
+ # Found in report
+ coverage_risk = (1.0 - cov_ratio) * 10.0
+ else:
+ # Not found in report -> Assumed 0% coverage -> Max Risk
+ # BUT only if file is relevant code (not test, etc).
+ # For simplicity, if coverage parser is active but file missing, max risk.
+ # This aligns with "If cov_ratio is None: coverage_score = 10.0" from spec
+ coverage_risk = 10.0
+
+ # 4. File Size (Lines)
+ file_lines = metrics.lines_of_code
+
+ # Calculate Risk
+ risk_result = self._weighted_risk_model(
+ complexity=complexity,
+ churn=churn,
+ coverage=coverage_risk,
+ author_count=author_count,
+ file_lines=file_lines
)
- # Store for propagation
- base_scores[file_path] = combined_score
+ risk_score = risk_result["risk_score"]
+ base_scores[file_path] = risk_score
- # Store intermediate for detailed output
- sub_scores = {
- "static_score": round(static_score, 3),
- "churn_score": round(churn_score, 3),
- "coverage_penalty": round(coverage_penalty, 3),
- "combined_base_score": round(combined_score, 3)
- }
+ # Determine factors
+ factors = []
+ breakdown = risk_result["breakdown"]
+ if breakdown["complexity"] > 6.0: factors.append("high_complexity")
+ if breakdown["churn"] > 6.0: factors.append("high_churn")
+ if breakdown["coverage"] > 8.0: factors.append("low_coverage")
+ if breakdown["author_diversity"] > 6.0: factors.append("many_authors")
- # Temporary FileRisk (will be updated after propagation)
- # We don't have level/factors yet fully determined.
file_risks[file_path] = FileRisk(
- risk_score=combined_score,
- level="low", # placeholder
- factors=[],
- sub_scores=sub_scores
+ risk_score=risk_score,
+ level=risk_result["risk_level"].lower(),
+ factors=factors,
+ sub_scores=breakdown
)
- # 2. Propagation
- # Build dependency graph in format for propagator: Dict[str, List[str]]
- # The snapshot has dependencies.
+ # Propagation (Optional: Apply on top of weighted score or integrate?)
+ # Architecture doc says propagation is important.
+ # We can apply propagation to the `risk_score`.
+
+ # Build dependency graph
dep_graph_dict = {}
if snapshot.dependencies:
- # dependency_graph.internal is List[Dict[str, str]] e.g. [{"source": "A", "target": "B"}]?
- # Wait, `internal: List[Dict[str, str]]` description says "List of internal dependencies."
- # Need to verify structure. Usually it is [{"source": ..., "target": ...}] or similar.
- # Or maybe it's a list of dicts like [{"path": "...", "imports": [...]}]?
- # Let's check `codesage/snapshot/models.py`.
- # `internal: List[Dict[str, str]]`.
- # Also `edges: List[Tuple[str, str]]`.
-
- # If edges is populated, use that.
for src, dest in snapshot.dependencies.edges:
if src not in dep_graph_dict:
dep_graph_dict[src] = []
dep_graph_dict[src].append(dest)
- final_scores = self.risk_propagator.propagate(dep_graph_dict, base_scores)
+ propagated_scores = self.risk_propagator.propagate(dep_graph_dict, base_scores)
- # 3. Finalize
+ # Update scores with propagation
for file_snapshot in snapshot.files:
path = file_snapshot.path
- score = final_scores.get(path, 0.0)
+ if path in file_risks:
+ original_risk = file_risks[path]
+ new_score = propagated_scores.get(path, original_risk.risk_score)
- # Normalize to 0-1 if it exceeded
- score = min(score, 1.0) # Or should we allow >1? Usually risk is 0-1 or 0-100. Let's cap at 1.0 (100%)
+ # Cap at 10.0
+ new_score = min(10.0, new_score)
- # Level
- if score >= self.config.threshold_risk_high:
- level = "high"
- elif score >= self.config.threshold_risk_medium:
- level = "medium"
- else:
- level = "low"
+ # Update level if score increased significantly
+ # (Simple logic for now)
+ if new_score >= 8.0: level = "critical"
+ elif new_score >= 6.0: level = "high"
+ elif new_score >= 4.0: level = "medium"
+ else: level = "low"
- # Factors
- factors = []
- risk_obj = file_risks.get(path)
- sub_scores = risk_obj.sub_scores if risk_obj else {}
-
- static_s = sub_scores.get("static_score", 0)
- churn_s = sub_scores.get("churn_score", 0)
- cov_p = sub_scores.get("coverage_penalty", 0)
- base_s = sub_scores.get("combined_base_score", 0)
+ # Add propagation factor
+ if new_score > original_risk.risk_score + 0.5:
+ original_risk.factors.append("risk_propagated")
- if static_s > 0.7: factors.append("high_complexity")
- if churn_s > 0.7: factors.append("high_churn")
- if cov_p > 0.5 and static_s > 0.3: factors.append("low_coverage_complex")
- if (score - base_s) > 0.2: factors.append("risk_propagated")
+ original_risk.risk_score = round(new_score, 2)
+ original_risk.level = level
+ original_risk.sub_scores["propagated_score"] = round(new_score, 2)
- sub_scores["final_score"] = round(score, 3)
- sub_scores["propagation_impact"] = round(score - base_s, 3)
-
- file_snapshot.risk = FileRisk(
- risk_score=score,
- level=level,
- factors=factors,
- sub_scores=sub_scores
- )
-
- # 4. Summarize Project Risk
- snapshot.risk_summary = summarize_project_risk({f.path: f.risk for f in snapshot.files if f.risk})
+ file_snapshot.risk = original_risk
+ # Summary
+ snapshot.risk_summary = summarize_project_risk(file_risks)
return snapshot
-# Backwards compatibility wrapper
-def score_file_risk(metrics: FileMetrics, config: RiskBaselineConfig) -> FileRisk:
- """Legacy function for single file scoring without context."""
- scorer = RiskScorer(config)
- # Create a dummy score
- static = scorer._calculate_static_score(metrics)
- level = "low"
- if static >= config.threshold_risk_high: level = "high"
- elif static >= config.threshold_risk_medium: level = "medium"
-
- return FileRisk(
- risk_score=static,
- level=level,
- factors=["static_analysis_only"],
- sub_scores={"static_score": static}
- )
-
def summarize_project_risk(file_risks: Dict[str, FileRisk]) -> ProjectRiskSummary:
"""Summarizes the risk for the entire project."""
if not file_risks:
@@ -229,7 +247,7 @@ def summarize_project_risk(file_risks: Dict[str, FileRisk]) -> ProjectRiskSummar
total_risk = sum(r.risk_score for r in file_risks.values())
avg_risk = total_risk / len(file_risks)
- high_risk_files = sum(1 for r in file_risks.values() if r.level == "high")
+ high_risk_files = sum(1 for r in file_risks.values() if r.level in ["high", "critical"])
medium_risk_files = sum(1 for r in file_risks.values() if r.level == "medium")
low_risk_files = sum(1 for r in file_risks.values() if r.level == "low")
diff --git a/codesage/rules/jules_specific_rules.py b/codesage/rules/jules_specific_rules.py
new file mode 100644
index 0000000..32c21df
--- /dev/null
+++ b/codesage/rules/jules_specific_rules.py
@@ -0,0 +1,318 @@
+"""Jules LLM 代码生成的特定反模式检测
+基于实际使用经验沉淀的规则集
+"""
+import ast
+import re
+from typing import Optional, List, Any
+from codesage.rules.base import BaseRule, RuleContext
+from codesage.snapshot.models import Issue, FileSnapshot
+
+# Adapter class to bridge old Rule interface if needed or use BaseRule
+# The existing code seems to use BaseRule.
+# My implementations used a simpler interface: check(self, snapshot: FileSnapshot)
+# I need to adapt them to match `check(self, ctx: RuleContext)`
+
+class JulesRule(BaseRule):
+ """Base class for Jules-specific rules simplifying access"""
+ rule_id: str = "jules-base"
+ description: str = "Base Jules Rule"
+
+ # We need to define these as fields for Pydantic if BaseRule inherits from BaseModel?
+ # Checking BaseRule in base.py: inherits from ABC. Not Pydantic model.
+ # But it has type annotations.
+
+ # Actually BaseRule is abstract.
+ # Let's override check.
+
+ def check(self, ctx: RuleContext) -> List[Issue]:
+ return self.check_file(ctx.file)
+
+ def check_file(self, snapshot: FileSnapshot) -> List[Issue]:
+ raise NotImplementedError
+
+class IncompleteErrorHandling(JulesRule):
+ """检测 LLM 生成代码中常见的"半成品异常处理"
+ """
+ rule_id = "jules-001"
+ description = "Empty exception handler (common LLM artifact)"
+ severity = "HIGH" # Not part of BaseRule interface directly but used in logic
+
+ def check_file(self, snapshot: FileSnapshot) -> List[Issue]:
+ issues = []
+ if snapshot.language != "python":
+ return issues
+
+ try:
+ tree = ast.parse(snapshot.content)
+ for node in ast.walk(tree):
+ if isinstance(node, ast.ExceptHandler):
+ if len(node.body) == 1:
+ child = node.body[0]
+ if isinstance(child, (ast.Pass, ast.Ellipsis)):
+ issues.append(Issue(
+ rule_id=self.rule_id,
+ severity="error", # Mapped from HIGH
+ message=self.description,
+ location={"file_path": snapshot.path, "line": node.lineno},
+ symbol=None,
+ tags=["jules-pattern"]
+ ))
+ except Exception:
+ pass
+ return issues
+
+class MagicNumbersInConfig(JulesRule):
+ """检测硬编码的配置值(LLM 常忘记参数化)
+ """
+ rule_id = "jules-002"
+ description = "Hardcoded configuration value"
+
+ def check_file(self, snapshot: FileSnapshot) -> List[Issue]:
+ issues = []
+ if snapshot.language != "python":
+ return issues
+
+ try:
+ tree = ast.parse(snapshot.content)
+ for node in ast.walk(tree):
+ if isinstance(node, ast.Assign):
+ for target in node.targets:
+ if isinstance(target, ast.Name):
+ name = target.id.lower()
+ if any(k in name for k in ['timeout', 'retries', 'limit', 'threshold']):
+ if isinstance(node.value, ast.Constant) and isinstance(node.value.value, (int, float)):
+ issues.append(Issue(
+ rule_id=self.rule_id,
+ severity="warning",
+ message=f"Hardcoded configuration value '{target.id}'",
+ location={"file_path": snapshot.path, "line": node.lineno},
+ symbol=target.id,
+ tags=["jules-pattern"]
+ ))
+ except Exception:
+ pass
+ return issues
+
+class InconsistentNamingConvention(JulesRule):
+ """检测 LLM 生成代码的命名风格不一致
+ """
+ rule_id = "jules-003"
+ description = "Mixed naming conventions detected"
+
+ def check_file(self, snapshot: FileSnapshot) -> List[Issue]:
+ issues = []
+ if snapshot.language != "python":
+ return issues
+
+ try:
+ tree = ast.parse(snapshot.content)
+ snake_case_count = 0
+ camel_case_count = 0
+
+ for node in ast.walk(tree):
+ if isinstance(node, ast.FunctionDef):
+ name = node.name
+ if not name.startswith('_'):
+ if name.islower() and '_' in name:
+ snake_case_count += 1
+ elif name != name.lower() and '_' not in name:
+ camel_case_count += 1
+
+ if snake_case_count > 0 and camel_case_count > 0:
+ issues.append(Issue(
+ rule_id=self.rule_id,
+ severity="info",
+ message=f"Mixed naming conventions detected (snake_case: {snake_case_count}, camelCase: {camel_case_count})",
+ location={"file_path": snapshot.path, "line": 1},
+ symbol=None,
+ tags=["jules-pattern"]
+ ))
+ except Exception:
+ pass
+ return issues
+
+class LongFunctionRule(JulesRule):
+ """检测 LLM 生成的过长函数"""
+ rule_id = "jules-004"
+ description = "Function is too long"
+
+ def check_file(self, snapshot: FileSnapshot) -> List[Issue]:
+ issues = []
+ if snapshot.language != "python": return issues
+ try:
+ tree = ast.parse(snapshot.content)
+ for node in ast.walk(tree):
+ if isinstance(node, ast.FunctionDef):
+ length = node.end_lineno - node.lineno
+ if length > 50:
+ issues.append(Issue(
+ rule_id=self.rule_id,
+ severity="warning",
+ message=f"Function '{node.name}' is too long ({length} lines)",
+ location={"file_path": snapshot.path, "line": node.lineno},
+ symbol=node.name,
+ tags=["jules-pattern"]
+ ))
+ except Exception: pass
+ return issues
+
+class TODOLeftoverRule(JulesRule):
+ """检测 LLM 留下的 TODO 注释"""
+ rule_id = "jules-005"
+ description = "Found TODO comment"
+
+ def check_file(self, snapshot: FileSnapshot) -> List[Issue]:
+ issues = []
+ if not snapshot.content: return issues
+ lines = snapshot.content.splitlines()
+ for i, line in enumerate(lines):
+ if "TODO" in line:
+ issues.append(Issue(
+ rule_id=self.rule_id,
+ severity="info",
+ message="Found TODO comment",
+ location={"file_path": snapshot.path, "line": i+1},
+ symbol=None,
+ tags=["jules-pattern"]
+ ))
+ return issues
+
+class HardcodedPathRule(JulesRule):
+ """检测硬编码的文件路径"""
+ rule_id = "jules-006"
+ description = "Possible hardcoded path detected"
+
+ def check_file(self, snapshot: FileSnapshot) -> List[Issue]:
+ issues = []
+ if snapshot.language != "python": return issues
+ try:
+ tree = ast.parse(snapshot.content)
+ for node in ast.walk(tree):
+ if isinstance(node, ast.Constant) and isinstance(node.value, str):
+ val = node.value
+ if (val.startswith("/") or "C:\\" in val) and len(val) > 3:
+ issues.append(Issue(
+ rule_id=self.rule_id,
+ severity="warning",
+ message=f"Possible hardcoded path detected: '{val}'",
+ location={"file_path": snapshot.path, "line": node.lineno},
+ symbol=None,
+ tags=["jules-pattern"]
+ ))
+ except Exception: pass
+ return issues
+
+class PrintStatementRule(JulesRule):
+ """检测遗留的 print 调试语句"""
+ rule_id = "jules-007"
+ description = "Use of print() detected"
+
+ def check_file(self, snapshot: FileSnapshot) -> List[Issue]:
+ issues = []
+ if snapshot.language != "python": return issues
+ try:
+ tree = ast.parse(snapshot.content)
+ for node in ast.walk(tree):
+ if isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == "print":
+ issues.append(Issue(
+ rule_id=self.rule_id,
+ severity="info",
+ message="Use of print() detected",
+ location={"file_path": snapshot.path, "line": node.lineno},
+ symbol="print",
+ tags=["jules-pattern"]
+ ))
+ except Exception: pass
+ return issues
+
+class BroadExceptionRule(JulesRule):
+ """检测捕获所有异常 (Exception) 而不记录"""
+ rule_id = "jules-008"
+ description = "Broad exception caught without logging"
+
+ def check_file(self, snapshot: FileSnapshot) -> List[Issue]:
+ issues = []
+ if snapshot.language != "python": return issues
+ try:
+ tree = ast.parse(snapshot.content)
+ for node in ast.walk(tree):
+ if isinstance(node, ast.ExceptHandler):
+ if node.type is None or (isinstance(node.type, ast.Name) and node.type.id == "Exception"):
+ # Check if logged or re-raised
+ has_logging = False
+ for child in node.body:
+ if isinstance(child, ast.Raise): has_logging = True
+ if isinstance(child, ast.Call) and hasattr(child.func, 'attr') and child.func.attr in ['error', 'exception']: has_logging = True
+
+ if not has_logging:
+ issues.append(Issue(
+ rule_id=self.rule_id,
+ severity="warning",
+ message="Broad exception caught without logging or re-raising",
+ location={"file_path": snapshot.path, "line": node.lineno},
+ symbol=None,
+ tags=["jules-pattern"]
+ ))
+ except Exception: pass
+ return issues
+
+class PlaceholderFunctionRule(JulesRule):
+ """检测占位符函数 (pass)"""
+ rule_id = "jules-009"
+ description = "Placeholder function detected"
+
+ def check_file(self, snapshot: FileSnapshot) -> List[Issue]:
+ issues = []
+ if snapshot.language != "python": return issues
+ try:
+ tree = ast.parse(snapshot.content)
+ for node in ast.walk(tree):
+ if isinstance(node, ast.FunctionDef):
+ if len(node.body) == 1 and isinstance(node.body[0], ast.Pass):
+ issues.append(Issue(
+ rule_id=self.rule_id,
+ severity="info",
+ message=f"Placeholder function '{node.name}'",
+ location={"file_path": snapshot.path, "line": node.lineno},
+ symbol=node.name,
+ tags=["jules-pattern"]
+ ))
+ except Exception: pass
+ return issues
+
+class MissingDocstringRule(JulesRule):
+ """检测缺少文档字符串的函数"""
+ rule_id = "jules-010"
+ description = "Missing docstring"
+
+ def check_file(self, snapshot: FileSnapshot) -> List[Issue]:
+ issues = []
+ if snapshot.language != "python": return issues
+ try:
+ tree = ast.parse(snapshot.content)
+ for node in ast.walk(tree):
+ if isinstance(node, ast.FunctionDef):
+ if not ast.get_docstring(node) and not node.name.startswith('_'):
+ issues.append(Issue(
+ rule_id=self.rule_id,
+ severity="info",
+ message=f"Missing docstring for '{node.name}'",
+ location={"file_path": snapshot.path, "line": node.lineno},
+ symbol=node.name,
+ tags=["jules-pattern"]
+ ))
+ except Exception: pass
+ return issues
+
+JULES_RULESET = [
+ IncompleteErrorHandling(),
+ MagicNumbersInConfig(),
+ InconsistentNamingConvention(),
+ LongFunctionRule(),
+ TODOLeftoverRule(),
+ HardcodedPathRule(),
+ PrintStatementRule(),
+ BroadExceptionRule(),
+ PlaceholderFunctionRule(),
+ MissingDocstringRule()
+]
diff --git a/codesage/snapshot/models.py b/codesage/snapshot/models.py
index 2920e0b..ac38b3e 100644
--- a/codesage/snapshot/models.py
+++ b/codesage/snapshot/models.py
@@ -44,7 +44,7 @@ class FileMetrics(BaseModel):
class FileRisk(BaseModel):
risk_score: float = Field(..., description="The calculated risk score (0-1).")
- level: Literal["low", "medium", "high"] = Field(..., description="The risk level.")
+ level: Literal["low", "medium", "high", "critical"] = Field(..., description="The risk level.")
factors: List[str] = Field(default_factory=list, description="Factors contributing to the risk score.")
sub_scores: Dict[str, float] = Field(default_factory=dict, description="Detailed scores for each risk dimension.")
@@ -97,6 +97,8 @@ class LLMCallStats(BaseModel):
class FileSnapshot(BaseModel):
path: str = Field(..., description="The relative path to the file.")
language: str = Field(..., description="The programming language of the file.")
+ content: Optional[str] = Field(None, description="The content of the file. Required for deep analysis.")
+ size: Optional[int] = Field(None, description="The size of the file in bytes.")
metrics: Optional[FileMetrics] = Field(None, description="A summary of the file's metrics.")
symbols: Optional[Dict[str, Any]] = Field(default_factory=dict, description="A dictionary of symbols defined in the file.")
risk: Optional[FileRisk] = Field(None, description="Risk assessment for the file.")
diff --git a/codesage/test/coverage_parser.py b/codesage/test/coverage_parser.py
new file mode 100644
index 0000000..13fc1e9
--- /dev/null
+++ b/codesage/test/coverage_parser.py
@@ -0,0 +1,220 @@
+"""测试覆盖率解析器
+支持多种覆盖率报告格式(对齐 Jules 生态)
+"""
+import xml.etree.ElementTree as ET
+from pathlib import Path
+from typing import Dict, List, Optional
+import logging
+import os
+
+logger = logging.getLogger(__name__)
+
+class CoverageParser:
+ """覆盖率数据解析器
+
+ 支持格式:
+ - Cobertura XML (Python/Java 主流)
+ - JaCoCo XML (Java)
+ - LCOV (JavaScript/Go) - Optional for now
+ - Golang cover profile (Go)
+ """
+
+ def __init__(self, report_path: str):
+ self.report_path = report_path
+ self._coverage_cache: Dict[str, float] = {}
+ self._parsed = False
+
+ if report_path and os.path.exists(report_path):
+ self._parse()
+
+ def _parse(self):
+ """Auto-detect and parse the report"""
+ if not self.report_path:
+ return
+
+ try:
+ # Simple heuristic: if ends with .xml, try xml parsers.
+ if self.report_path.endswith('.xml'):
+ tree = ET.parse(self.report_path)
+ root = tree.getroot()
+ if root.tag == 'coverage': # Cobertura
+ # Relaxed check: Cobertura usually has packages or sources
+ if 'line-rate' in root.attrib or root.find('packages') is not None or root.find('sources') is not None:
+ self._coverage_cache = self.parse_cobertura(self.report_path)
+ elif root.tag == 'report': # JaCoCo
+ self._coverage_cache = self.parse_jacoco(self.report_path)
+ # Check for Go cover profile (first line usually "mode: set|count|atomic")
+ else:
+ with open(self.report_path, 'r') as f:
+ first_line = f.readline()
+ if first_line.startswith("mode:"):
+ self._coverage_cache = self.parse_go_cover(self.report_path)
+ except Exception as e:
+ logger.error(f"Failed to parse coverage report {self.report_path}: {e}")
+
+ self._parsed = True
+
+ def parse_go_cover(self, file_path: str) -> Dict[str, float]:
+ """解析 Golang cover profile format
+ Format: name.go:line.col,line.col num-stmt count
+ Example:
+ mode: set
+ github.com/pkg/foo/bar.go:10.12,12.3 2 1
+ """
+ results = {}
+ file_stats = {} # file -> {statements: int, covered: int}
+
+ try:
+ with open(file_path, 'r') as f:
+ lines = f.readlines()
+
+ for line in lines:
+ if line.startswith("mode:"):
+ continue
+ parts = line.split()
+ if len(parts) < 3:
+ continue
+
+ # Format: file:start,end num-stmt count
+ file_segment = parts[0]
+ try:
+ stmts = int(parts[1])
+ count = int(parts[2])
+ except ValueError:
+ continue
+
+ # Extract filename (everything before the last colon)
+ if ':' in file_segment:
+ filename = file_segment.rsplit(':', 1)[0]
+ else:
+ filename = file_segment
+
+ if filename not in file_stats:
+ file_stats[filename] = {'total': 0, 'covered': 0}
+
+ file_stats[filename]['total'] += stmts
+ if count > 0:
+ file_stats[filename]['covered'] += stmts
+
+ for filename, stats in file_stats.items():
+ if stats['total'] > 0:
+ results[filename] = stats['covered'] / stats['total']
+ else:
+ results[filename] = 1.0
+
+ except Exception as e:
+ logger.error(f"Error parsing Go coverage: {e}")
+
+ return results
+
+ def parse_cobertura(self, xml_path: str) -> Dict[str, float]:
+ """解析 Cobertura XML 格式
+
+ 返回格式:
+ {
+ "src/engine.py": 0.85, # 85% 覆盖率
+ "src/parser.py": 0.42,
+ ...
+ }
+ """
+ results = {}
+ try:
+ tree = ET.parse(xml_path)
+ root = tree.getroot()
+
+ # Cobertura structure: packages -> package -> classes -> class -> filename
+ for package in root.findall(".//package"):
+ for cls in package.findall(".//class"):
+ filename = cls.get("filename")
+ line_rate = cls.get("line-rate")
+ if filename and line_rate:
+ try:
+ results[filename] = float(line_rate)
+ except ValueError:
+ pass
+
+ # Also handle if classes are direct children (some variants)
+ for cls in root.findall(".//class"):
+ filename = cls.get("filename")
+ line_rate = cls.get("line-rate")
+ if filename and line_rate:
+ try:
+ results[filename] = float(line_rate)
+ except ValueError:
+ pass
+
+ except Exception as e:
+ logger.error(f"Error parsing Cobertura XML: {e}")
+
+ return results
+
+ def parse_jacoco(self, xml_path: str) -> Dict[str, float]:
+ """解析 JaCoCo XML 格式(Java 专用)"""
+ results = {}
+ try:
+ tree = ET.parse(xml_path)
+ root = tree.getroot()
+
+ # JaCoCo structure: package -> sourcefile
+ for package in root.findall("package"):
+ pkg_name = package.get("name", "")
+ for sourcefile in package.findall("sourcefile"):
+ name = sourcefile.get("name")
+ if not name:
+ continue
+
+ # Construct full path if possible, or just use filename?
+ # Usually report has relative paths.
+ # JaCoCo separates package name (slashes) and file name.
+ full_path = f"{pkg_name}/{name}" if pkg_name else name
+
+ # Calculate coverage from counters
+ #
+ covered = 0
+ missed = 0
+ found_line_counter = False
+ for counter in sourcefile.findall("counter"):
+ if counter.get("type") == "LINE":
+ try:
+ covered = int(counter.get("covered", 0))
+ missed = int(counter.get("missed", 0))
+ found_line_counter = True
+ except ValueError:
+ pass
+ break
+
+ if found_line_counter:
+ total = covered + missed
+ if total > 0:
+ results[full_path] = covered / total
+ else:
+ results[full_path] = 1.0 # Empty file?
+
+ except Exception as e:
+ logger.error(f"Error parsing JaCoCo XML: {e}")
+
+ return results
+
+ def get_file_coverage(self, file_path: str) -> Optional[float]:
+ """查询单个文件的覆盖率(0.0 - 1.0)
+
+ 未覆盖返回 None(与"覆盖率为 0"区分)
+ """
+ # File paths in report might be relative or absolute.
+ # We try to match end of path if exact match fails.
+ if file_path in self._coverage_cache:
+ return self._coverage_cache[file_path]
+
+ # Fuzzy match: try to find if any key in cache ends with file_path (or vice versa)
+ # This is risky but common since reports might have different root.
+ # Ideally we normalize paths.
+
+ for key, value in self._coverage_cache.items():
+ if file_path.endswith(key) or key.endswith(file_path):
+ return value
+
+ return None
+
+ def get_uncovered_files(self) -> List[str]:
+ """列出完全无测试覆盖的文件(高风险)"""
+ return [f for f, cov in self._coverage_cache.items() if cov == 0.0]
diff --git a/poetry.lock b/poetry.lock
index e46b8fb..8e2f18d 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -426,6 +426,21 @@ docs = ["furo (>=2024.8.6)", "sphinx (>=8.0.2)", "sphinx-autodoc-typehints (>=2.
testing = ["covdefaults (>=2.3)", "coverage (>=7.6.1)", "diff-cover (>=9.2)", "pytest (>=8.3.3)", "pytest-asyncio (>=0.24)", "pytest-cov (>=5)", "pytest-mock (>=3.14)", "pytest-timeout (>=2.3.1)", "virtualenv (>=20.26.4)"]
typing = ["typing-extensions (>=4.12.2) ; python_version < \"3.11\""]
+[[package]]
+name = "gitdb"
+version = "4.0.12"
+description = "Git Object Database"
+optional = false
+python-versions = ">=3.7"
+groups = ["main"]
+files = [
+ {file = "gitdb-4.0.12-py3-none-any.whl", hash = "sha256:67073e15955400952c6565cc3e707c554a4eea2e428946f7a4c162fab9bd9bcf"},
+ {file = "gitdb-4.0.12.tar.gz", hash = "sha256:5ef71f855d191a3326fcfbc0d5da835f26b13fbcba60c32c21091c349ffdb571"},
+]
+
+[package.dependencies]
+smmap = ">=3.0.1,<6"
+
[[package]]
name = "gitignore-parser"
version = "0.1.13"
@@ -437,6 +452,25 @@ files = [
{file = "gitignore_parser-0.1.13.tar.gz", hash = "sha256:c7e10c8190accb8ae57fb3711889e73a9c0dbc04d4222b91ace8a4bf64d2f746"},
]
+[[package]]
+name = "gitpython"
+version = "3.1.45"
+description = "GitPython is a Python library used to interact with Git repositories"
+optional = false
+python-versions = ">=3.7"
+groups = ["main"]
+files = [
+ {file = "gitpython-3.1.45-py3-none-any.whl", hash = "sha256:8908cb2e02fb3b93b7eb0f2827125cb699869470432cc885f019b8fd0fccff77"},
+ {file = "gitpython-3.1.45.tar.gz", hash = "sha256:85b0ee964ceddf211c41b9f27a49086010a190fd8132a24e21f362a4b36a791c"},
+]
+
+[package.dependencies]
+gitdb = ">=4.0.1,<5"
+
+[package.extras]
+doc = ["sphinx (>=7.1.2,<7.2)", "sphinx-autodoc-typehints", "sphinx_rtd_theme"]
+test = ["coverage[toml]", "ddt (>=1.1.1,!=1.4.3)", "mock ; python_version < \"3.8\"", "mypy", "pre-commit", "pytest (>=7.3.1)", "pytest-cov", "pytest-instafail", "pytest-mock", "pytest-sugar", "typing-extensions ; python_version < \"3.11\""]
+
[[package]]
name = "greenlet"
version = "3.2.4"
@@ -1918,6 +1952,18 @@ files = [
{file = "ruff-0.14.5.tar.gz", hash = "sha256:8d3b48d7d8aad423d3137af7ab6c8b1e38e4de104800f0d596990f6ada1a9fc1"},
]
+[[package]]
+name = "smmap"
+version = "5.0.2"
+description = "A pure Python implementation of a sliding window memory map manager"
+optional = false
+python-versions = ">=3.7"
+groups = ["main"]
+files = [
+ {file = "smmap-5.0.2-py3-none-any.whl", hash = "sha256:b30115f0def7d7531d22a0fb6502488d879e75b260a9db4d0819cfb25403af5e"},
+ {file = "smmap-5.0.2.tar.gz", hash = "sha256:26ea65a03958fa0c8a1c7e8c7a58fdc77221b8910f6be2131affade476898ad5"},
+]
+
[[package]]
name = "sniffio"
version = "1.3.1"
@@ -2432,4 +2478,4 @@ files = [
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<4.0"
-content-hash = "7d874d99d43c26a19a10a752823c89cc293c96f252bd926ef8e1e9183b31771c"
+content-hash = "c67b0bca8c4f789270cf25600032697c58e6dd1475e7c895e211fcf6dec9d9d2"
diff --git a/pyproject.toml b/pyproject.toml
index 3d15c4a..f555640 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -35,6 +35,7 @@ playwright = "^1.56.0"
sqlalchemy = "^2.0.44"
alembic = "^1.17.2"
tree-sitter-java = "^0.23.5"
+gitpython = "^3.1.45"
[tool.poetry.dev-dependencies]
diff --git a/tests/integration/test_risk_integration.py b/tests/integration/test_risk_integration.py
index c96d7ee..c0073da 100644
--- a/tests/integration/test_risk_integration.py
+++ b/tests/integration/test_risk_integration.py
@@ -1,103 +1,108 @@
-import unittest
+
+import pytest
+from datetime import datetime
from unittest.mock import MagicMock, patch
+
from codesage.risk.risk_scorer import RiskScorer
from codesage.config.risk_baseline import RiskBaselineConfig
-from codesage.snapshot.models import ProjectSnapshot, FileSnapshot, FileMetrics, SnapshotMetadata, DependencyGraph
-
-class TestRiskIntegration(unittest.TestCase):
- def setUp(self):
- self.config = RiskBaselineConfig()
-
- # Mock GitMiner
- self.patcher_git = patch('codesage.risk.risk_scorer.GitMiner')
- self.MockGitMiner = self.patcher_git.start()
- self.mock_git_miner = self.MockGitMiner.return_value
-
- # Mock CoverageScorer
- self.patcher_cov = patch('codesage.risk.risk_scorer.CoverageScorer')
- self.MockCoverageScorer = self.patcher_cov.start()
- self.mock_cov_scorer = self.MockCoverageScorer.return_value
-
- self.scorer = RiskScorer(self.config)
-
- # By default mock coverage returns 1.0 (full coverage) unless specified
- self.mock_cov_scorer.get_coverage.return_value = 1.0
-
- # By default churn is 0
- self.mock_git_miner.get_file_churn.return_value = 0
-
- def tearDown(self):
- self.patcher_git.stop()
- self.patcher_cov.stop()
-
- def test_full_scoring(self):
- # Create a snapshot with 3 files
- # A (High Complexity, High Churn, Low Coverage) -> Risk should be very high
- # B (Low Complexity, Low Churn, Full Coverage)
- # C (Medium Complexity)
-
- metadata = SnapshotMetadata(
- version="1", timestamp="2023-01-01", project_name="test",
- file_count=3, total_size=100, tool_version="1.0", config_hash="abc"
+from codesage.snapshot.models import ProjectSnapshot, FileSnapshot, FileMetrics, SnapshotMetadata, FileRisk
+
+@pytest.fixture
+def mock_snapshot():
+ meta = SnapshotMetadata(
+ version="v1",
+ timestamp=datetime.now(),
+ project_name="test_proj",
+ file_count=2,
+ total_size=100,
+ tool_version="1.0",
+ config_hash="abc"
+ )
+
+ file1 = FileSnapshot(
+ path="src/complex.py",
+ language="python",
+ content="def foo(): pass",
+ metrics=FileMetrics(
+ lines_of_code=200,
+ language_specific={
+ "python": {
+ "max_cyclomatic_complexity": 20, # High
+ "avg_cyclomatic_complexity": 10.0,
+ "fan_out": 10
+ }
+ }
)
-
- # File A: High risk
- metrics_a = FileMetrics(
- lines_of_code=2000,
- language_specific={"python": {"max_cyclomatic_complexity": 20, "avg_cyclomatic_complexity": 10, "fan_out": 30}}
+ )
+
+ file2 = FileSnapshot(
+ path="src/simple.py",
+ language="python",
+ content="print('hello')",
+ metrics=FileMetrics(
+ lines_of_code=10,
+ language_specific={
+ "python": {
+ "max_cyclomatic_complexity": 1,
+ "avg_cyclomatic_complexity": 1.0,
+ "fan_out": 0
+ }
+ }
)
- # Static Score A calculation:
- # max_cc(20) > threshold(10) -> norm=1.0 * 0.4 = 0.4
- # avg_cc(10) > threshold(10) -> norm=1.0 * 0.3 = 0.3
- # fan_out(30) > 20 -> norm=1.0 * 0.2 = 0.2
- # loc(2000) > 1000 -> norm=1.0 * 0.1 = 0.1
- # Total Static A = 1.0
-
- # Churn A: High
- self.mock_git_miner.get_file_churn.side_effect = lambda f, **kwargs: 20 if f == "A" else 0
-
- # Coverage A: Low (0.0)
- self.mock_cov_scorer.get_coverage.side_effect = lambda f: 0.0 if f == "A" else 1.0
-
- file_a = FileSnapshot(path="A", language="python", metrics=metrics_a)
-
- # File B: Low risk, but depends on A
- metrics_b = FileMetrics(lines_of_code=10)
- file_b = FileSnapshot(path="B", language="python", metrics=metrics_b)
-
- snapshot = ProjectSnapshot(
- metadata=metadata,
- files=[file_a, file_b],
- dependencies=DependencyGraph(edges=[("B", "A")]) # B -> A
- )
-
- # Set coverage file to trigger scorer usage
- self.scorer.set_coverage_report("dummy.xml")
-
- # Run scoring
- result = self.scorer.score_project(snapshot)
-
- # Check A
- res_a = next(f for f in result.files if f.path == "A")
- # Ensure it has high risk
- self.assertAlmostEqual(res_a.risk.risk_score, 1.0, delta=0.01)
- self.assertIn("high_complexity", res_a.risk.factors)
-
- # Check B
- res_b = next(f for f in result.files if f.path == "B")
- # B should have propagated risk from A
- self.assertAlmostEqual(res_b.risk.risk_score, 0.2, delta=0.01)
-
- # In risk_scorer.py:
- # if (score - base_s) > 0.2: factors.append("risk_propagated")
- # Base B is 0.
- # Score B is 0.2.
- # 0.2 > 0.2 is FALSE.
- # So it won't have "risk_propagated".
- # I should expect it if I lower threshold or increase risk.
-
- # Since 0.2 is not strictly greater than 0.2, factor is missing.
- # I will change expectation or update logic to >=.
-
-if __name__ == '__main__':
- unittest.main()
+ )
+
+ return ProjectSnapshot(
+ metadata=meta,
+ files=[file1, file2],
+ languages=["python"]
+ )
+
+def test_risk_scorer_integration_static_only(mock_snapshot):
+ config = RiskBaselineConfig()
+ scorer = RiskScorer(config)
+
+ scored_snapshot = scorer.score_project(mock_snapshot)
+
+ f1 = next(f for f in scored_snapshot.files if f.path == "src/complex.py")
+ f2 = next(f for f in scored_snapshot.files if f.path == "src/simple.py")
+
+ # Static score for f1 should be high because max_cc=20
+ # In my logic: 0.5 * min(20/15, 1) + ...
+ # 0.5 * 1 + 0.3 * 1 + 0.2 * 0.5 = 0.9 * 10 = 9.0 complexity
+
+ # Static score only contributed 30% to total risk (weight_complexity=0.3)
+ # Risk = 0.3 * 9.0 = 2.7.
+ # Plus file_size=200 lines -> 2.0. weight=0.1 -> 0.2
+ # Total ~ 2.9 (Low)
+
+ assert f1.risk.risk_score > f2.risk.risk_score
+ assert f1.risk.sub_scores["complexity"] > 5.0
+
+ # Ensure churn/coverage is 0 (as not provided)
+ assert f1.risk.sub_scores["churn"] == 0.0
+ assert f1.risk.sub_scores["coverage"] == 0.0
+
+@patch("codesage.git.miner.GitMiner.get_file_churn_score")
+@patch("codesage.git.miner.GitMiner.get_file_author_count")
+def test_risk_scorer_integration_with_churn(mock_author, mock_churn, mock_snapshot):
+ mock_churn.return_value = 10.0 # Max churn
+ mock_author.return_value = 5 # Max authors (5 -> 10 score)
+
+ config = RiskBaselineConfig()
+ # Pass repo_path to trigger GitMiner usage (although we mocked methods, init needs path)
+ scorer = RiskScorer(config, repo_path=".")
+
+ scored_snapshot = scorer.score_project(mock_snapshot)
+ f1 = next(f for f in scored_snapshot.files if f.path == "src/complex.py")
+
+ # Components:
+ # Complexity: ~9.0 * 0.3 = 2.7
+ # Churn: 10.0 * 0.25 = 2.5
+ # Author: 10.0 * 0.1 = 1.0
+ # Size: 2.0 * 0.1 = 0.2
+ # Coverage: 0.0
+ # Total: 2.7 + 2.5 + 1.0 + 0.2 = 6.4 (High)
+
+ assert f1.risk.risk_score >= 6.0
+ assert f1.risk.level in ["high", "critical"]
+ assert "high_churn" in f1.risk.factors
diff --git a/tests/unit/git/test_miner.py b/tests/unit/git/test_miner.py
new file mode 100644
index 0000000..1322ce8
--- /dev/null
+++ b/tests/unit/git/test_miner.py
@@ -0,0 +1,81 @@
+
+import pytest
+import os
+from unittest.mock import MagicMock, patch
+from codesage.git.miner import GitMiner
+from datetime import datetime
+
+class TestGitMiner:
+
+ @patch("codesage.git.miner.Repo")
+ def test_get_file_churn_score(self, mock_repo_class):
+ # Setup mock repo
+ mock_repo = MagicMock()
+ mock_repo_class.return_value = mock_repo
+
+ # Mock commits
+ mock_commit1 = MagicMock()
+ mock_commit1.stats.files = {"test.py": 1}
+ mock_commit2 = MagicMock()
+ mock_commit2.stats.files = {"test.py": 1}
+
+ mock_repo.iter_commits.return_value = [mock_commit1, mock_commit2]
+
+ miner = GitMiner(".")
+
+ # Test churn calculation
+ # 2 commits in 90 days. 90/30 = 3 months.
+ # Score = min(10, 2 / 3) = 0.67
+ score = miner.get_file_churn_score("test.py", days=90)
+ assert score == 0.67
+
+ # Test max score
+ # Reset cache
+ miner._churn_cache = {}
+ miner._cache_initialized = False
+
+ mock_repo.iter_commits.return_value = [MagicMock(stats=MagicMock(files={"test.py": 1}))] * 50
+ score_high = miner.get_file_churn_score("test.py", days=90)
+ assert score_high == 10.0
+
+ @patch("codesage.git.miner.Repo")
+ def test_get_file_author_count(self, mock_repo_class):
+ mock_repo = MagicMock()
+ mock_repo_class.return_value = mock_repo
+
+ c1 = MagicMock()
+ c1.author.email = "a@example.com"
+ c1.stats.files = {"test.py": 1}
+ c2 = MagicMock()
+ c2.author.email = "b@example.com"
+ c2.stats.files = {"test.py": 1}
+ c3 = MagicMock()
+ c3.author.email = "a@example.com" # Duplicate
+ c3.stats.files = {"test.py": 1}
+
+ mock_repo.iter_commits.return_value = [c1, c2, c3]
+
+ miner = GitMiner(".")
+ count = miner.get_file_author_count("test.py")
+ assert count == 2
+
+ @patch("codesage.git.miner.Repo")
+ def test_get_hotspot_files(self, mock_repo_class):
+ mock_repo = MagicMock()
+ mock_repo_class.return_value = mock_repo
+
+ c1 = MagicMock()
+ c1.stats.files = {"file1.py": 1, "file2.py": 1}
+ c2 = MagicMock()
+ c2.stats.files = {"file1.py": 1}
+
+ mock_repo.iter_commits.return_value = [c1, c2]
+
+ miner = GitMiner(".")
+ hotspots = miner.get_hotspot_files(top_n=2)
+
+ assert len(hotspots) == 2
+ assert hotspots[0]["path"] == "file1.py"
+ assert hotspots[0]["commits"] == 2
+ assert hotspots[1]["path"] == "file2.py"
+ assert hotspots[1]["commits"] == 1
diff --git a/tests/unit/rules/test_jules_rules.py b/tests/unit/rules/test_jules_rules.py
new file mode 100644
index 0000000..41c8ef6
--- /dev/null
+++ b/tests/unit/rules/test_jules_rules.py
@@ -0,0 +1,39 @@
+
+import pytest
+from codesage.rules.jules_specific_rules import JULES_RULESET, IncompleteErrorHandling, MagicNumbersInConfig
+from codesage.snapshot.models import FileSnapshot, Issue
+from codesage.rules.base import RuleContext
+
+class TestJulesRules:
+
+ def test_incomplete_error_handling(self):
+ code = """
+try:
+ foo()
+except Exception:
+ pass
+"""
+ snapshot = FileSnapshot(path="test.py", content=code, language="python", size=len(code))
+ rule = IncompleteErrorHandling()
+ # Mock context if possible, or just call check_file directly since we added that method.
+ # But `check` expects `RuleContext`.
+ # We can bypass `check` and use `check_file` for testing logic.
+ issues = rule.check_file(snapshot)
+ assert len(issues) == 1
+ assert issues[0].rule_id == "jules-001"
+
+ def test_magic_numbers(self):
+ code = """
+timeout = 30
+MAX_RETRIES = 5
+"""
+ snapshot = FileSnapshot(path="config.py", content=code, language="python", size=len(code))
+ rule = MagicNumbersInConfig()
+ issues = rule.check_file(snapshot)
+ # timeout=30 matches 'timeout'.
+ # MAX_RETRIES=5 matches 'retries' in lower case.
+ # So expected 2 issues.
+ assert len(issues) >= 1
+
+ def test_ruleset_completeness(self):
+ assert len(JULES_RULESET) >= 10
diff --git a/tests/unit/test_parser/test_coverage_parser.py b/tests/unit/test_parser/test_coverage_parser.py
new file mode 100644
index 0000000..13ec019
--- /dev/null
+++ b/tests/unit/test_parser/test_coverage_parser.py
@@ -0,0 +1,85 @@
+
+import pytest
+import os
+import xml.etree.ElementTree as ET
+from codesage.test.coverage_parser import CoverageParser
+
+class TestCoverageParser:
+
+ def test_parse_cobertura(self, tmp_path):
+ xml_content = """
+
+
+
+
+
+
+
+
+
+
+ """
+ f = tmp_path / "coverage.xml"
+ f.write_text(xml_content)
+
+ parser = CoverageParser(str(f))
+
+ assert parser.get_file_coverage("src/foo.py") == 0.8
+ assert parser.get_file_coverage("src/bar.py") == 0.5
+ assert parser.get_file_coverage("unknown.py") is None
+
+ def test_parse_jacoco(self, tmp_path):
+ xml_content = """
+
+
+
+
+
+
+
+
+ """
+ f = tmp_path / "jacoco.xml"
+ f.write_text(xml_content)
+
+ parser = CoverageParser(str(f))
+
+ # Covered 5, Missed 5 => Total 10 => 0.5
+ assert parser.get_file_coverage("com/example/Main.java") == 0.5
+
+ def test_parse_go_cover(self, tmp_path):
+ content = """mode: set
+github.com/pkg/foo/bar.go:10.12,12.3 2 1
+github.com/pkg/foo/baz.go:5.1,6.1 10 0
+"""
+ f = tmp_path / "coverage.out"
+ f.write_text(content)
+
+ parser = CoverageParser(str(f))
+
+ # bar.go: 2 stmts, covered
+ assert parser.get_file_coverage("github.com/pkg/foo/bar.go") == 1.0
+ # baz.go: 10 stmts, not covered
+ assert parser.get_file_coverage("github.com/pkg/foo/baz.go") == 0.0
+
+ def test_get_uncovered_files(self, tmp_path):
+ xml_content = """
+
+
+
+
+
+
+
+
+
+
+ """
+ f = tmp_path / "coverage.xml"
+ f.write_text(xml_content)
+
+ parser = CoverageParser(str(f))
+ uncovered = parser.get_uncovered_files()
+
+ assert "uncovered.py" in uncovered
+ assert "covered.py" not in uncovered