Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 77 additions & 10 deletions codesage/cli/commands/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
from codesage.cli.plugin_loader import PluginManager
from codesage.history.store import StorageEngine
from codesage.core.interfaces import CodeIssue
from codesage.risk.risk_scorer import RiskScorer
from codesage.config.risk_baseline import RiskBaselineConfig
from codesage.rules.jules_specific_rules import JULES_RULESET
from codesage.rules.base import RuleContext
from datetime import datetime, timezone

def get_builder(language: str, path: Path):
Expand Down Expand Up @@ -144,8 +148,10 @@ def merge_snapshots(snapshots: List[ProjectSnapshot], project_name: str) -> Proj
@click.option('--ci-mode', is_flag=True, help='Enable CI mode (auto-detect GitHub environment).')
@click.option('--plugins-dir', default='.codesage/plugins', help='Directory containing plugins.')
@click.option('--db-url', default='sqlite:///codesage.db', help='Database URL for storage.')
@click.option('--git-repo', type=click.Path(), help='Git 仓库路径(用于变更历史分析)')
@click.option('--coverage-report', type=click.Path(), help='覆盖率报告路径(Cobertura/JaCoCo XML)')
@click.pass_context
def scan(ctx, path, language, reporter, output, fail_on_high, ci_mode, plugins_dir, db_url):
def scan(ctx, path, language, reporter, output, fail_on_high, ci_mode, plugins_dir, db_url, git_repo, coverage_report):
"""
Scan the codebase and report issues.
"""
Expand Down Expand Up @@ -205,16 +211,73 @@ def scan(ctx, path, language, reporter, output, fail_on_high, ci_mode, plugins_d
click.echo(f"Failed to merge snapshots: {e}", err=True)
ctx.exit(1)

# 3. Apply Custom Rules (Plugins)
# Populate file contents if missing (needed for rules)
click.echo("Populating file contents...")
for file_snapshot in snapshot.files:
if not file_snapshot.content:
try:
full_path = root_path / file_snapshot.path
if full_path.exists():
file_snapshot.content = full_path.read_text(errors='ignore')
# Update size if missing
if file_snapshot.size is None:
file_snapshot.size = len(file_snapshot.content)
except Exception as e:
# logger.warning(f"Failed to read file {file_snapshot.path}: {e}")
pass

# 3. Apply Risk Scoring (Enhanced in Phase 1)
try:
risk_config = RiskBaselineConfig() # Load default config
scorer = RiskScorer(
config=risk_config,
repo_path=git_repo or path, # Default to scanned path if not specified
coverage_report=coverage_report
)
snapshot = scorer.score_project(snapshot)
except Exception as e:
click.echo(f"Warning: Risk scoring failed: {e}", err=True)

# 4. Apply Custom Rules (Plugins & Jules Rules)

# Create RuleContext
# We need a dummy config for now as RuleContext expects one, but JulesRules might not use it.
# However, PythonRulesetBaselineConfig is expected by RuleContext definition in base.py.
# We need to import it or mock it.
from codesage.config.rules_python_baseline import RulesPythonBaselineConfig
rule_config = RulesPythonBaselineConfig() # Default config

# Apply Jules Specific Rules
click.echo("Applying Jules-specific rules...")
for rule in JULES_RULESET:
for file_snapshot in snapshot.files:
try:
# Create context for this file
rule_ctx = RuleContext(
project=snapshot,
file=file_snapshot,
config=rule_config
)

# Call rule.check(ctx)
# Ensure rule supports check(ctx)
issues = rule.check(rule_ctx)

if issues:
if file_snapshot.issues is None:
file_snapshot.issues = []
file_snapshot.issues.extend(issues)
except Exception as e:
click.echo(f"Error applying rule {rule.rule_id} to {file_snapshot.path}: {e}", err=True)

# Apply Plugin Rules
for rule in plugin_manager.rules:
# Ensure we iterate over the list of files
for file_snapshot in snapshot.files:
file_path = Path(file_snapshot.path)
try:
content = ""
full_path = root_path / file_path
if full_path.exists():
content = full_path.read_text(errors='ignore')
# Content is already populated now
content = file_snapshot.content or ""

issues = rule.check(str(file_path), content, {})
if issues:
Expand Down Expand Up @@ -249,29 +312,33 @@ def scan(ctx, path, language, reporter, output, fail_on_high, ci_mode, plugins_d
except Exception as e:
click.echo(f"Error running rule {rule.id} on {file_path}: {e}", err=True)

# Recalculate Issues Summary after Plugins
# Simplified recalculation
# Recalculate Issues Summary after Plugins & Jules Rules
total_issues = 0
by_severity = {}
by_rule = {}

for f in snapshot.files:
if f.issues:
total_issues += len(f.issues)
for issue in f.issues:
by_severity[issue.severity] = by_severity.get(issue.severity, 0) + 1
if issue.rule_id:
by_rule[issue.rule_id] = by_rule.get(issue.rule_id, 0) + 1

# Update snapshot summary if issues changed
if snapshot.issues_summary:
snapshot.issues_summary.total_issues = total_issues
snapshot.issues_summary.by_severity = by_severity
snapshot.issues_summary.by_rule = by_rule
else:
snapshot.issues_summary = ProjectIssuesSummary(
total_issues=total_issues,
by_severity=by_severity
by_severity=by_severity,
by_rule=by_rule
)


# 4. Save to Storage
# 5. Save to Storage
if storage:
try:
storage.save_snapshot(snapshot.metadata.project_name, snapshot)
Expand Down
17 changes: 10 additions & 7 deletions codesage/config/risk_baseline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pydantic import BaseModel
from pydantic import BaseModel, Field

class RiskBaselineConfig(BaseModel):
"""Configuration for the baseline risk scorer."""
Expand All @@ -9,13 +9,16 @@ class RiskBaselineConfig(BaseModel):
weight_fan_out: float = 0.2
weight_loc: float = 0.1

# Weights for multi-dimensional scoring
# Final = w_static * static + w_churn * churn + w_cov * (static * (1-cov))
# Or as per task: Score = w1 * Complexity + w2 * Churn + w3 * (1 - Coverage)
# The "Complexity" here refers to the static score calculated above.
# Weights for multi-dimensional scoring (New Model)
# Risk = w1·Complexity + w2·Churn + w3·(1-Coverage) + w4·AuthorDiversity + w5·FileSize
weight_complexity: float = Field(default=0.30, description="Weight for complexity score")
weight_churn: float = Field(default=0.25, description="Weight for git churn score")
weight_coverage: float = Field(default=0.25, description="Weight for coverage risk")
weight_author_diversity: float = Field(default=0.10, description="Weight for author diversity")
weight_file_size: float = Field(default=0.10, description="Weight for file size (LOC)")

# Legacy weights (kept for backward compatibility if needed, but new model supersedes)
weight_static_score: float = 0.5
weight_churn: float = 0.3
weight_coverage_penalty: float = 0.2

# Propagation
Expand All @@ -29,7 +32,7 @@ class RiskBaselineConfig(BaseModel):

# Churn settings
churn_since_days: int = 90
threshold_churn_high: int = 10 # If file changed > 10 times in 90 days, normalized churn = 1.0
threshold_churn_high: int = 10

@classmethod
def from_defaults(cls) -> "RiskBaselineConfig":
Expand Down
121 changes: 121 additions & 0 deletions codesage/git/miner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""Git 历史数据挖掘器
实现架构设计第 3.1.3 节的"代码演化分析"能力
"""
from datetime import datetime, timedelta
import logging
from typing import Dict, List, Optional, Set
import os

try:
from git import Repo, InvalidGitRepositoryError
except ImportError:
Repo = None
InvalidGitRepositoryError = None

logger = logging.getLogger(__name__)

class GitMiner:
"""Git 历史挖掘器

核心指标(对齐架构设计):
- 变更频率: 近 N 天内的提交次数
- 文件热度: 累计变更行数 / 文件总行数 (这里简化为变更次数,后续可扩展)
- 作者分散度: 不同作者数量(高分散度 = 高风险)
"""

def __init__(self, repo_path: Optional[str] = None):
self.repo_path = repo_path or os.getcwd()
self.repo = None
self._churn_cache: Dict[str, int] = {}
self._author_cache: Dict[str, Set[str]] = {}
self._cache_initialized = False

if Repo:
try:
self.repo = Repo(self.repo_path, search_parent_directories=True)
except (InvalidGitRepositoryError, Exception) as e:
logger.warning(f"Failed to initialize Git repo at {self.repo_path}: {e}")

def _initialize_stats(self, days: int = 90):
"""Bulk process commits to populate caches."""
if self._cache_initialized:
return

if not self.repo:
return

try:
since_date = datetime.now() - timedelta(days=days)
# Use traverse_commits for potentially faster iteration if supported, otherwise standard iteration
# Iterating over all commits once is O(N_commits * M_files_changed) which is better than O(F_files * N_commits)
commits = self.repo.iter_commits(since=since_date)

for commit in commits:
# stats.files returns dict {path: stats}
for file_path in commit.stats.files.keys():
self._churn_cache[file_path] = self._churn_cache.get(file_path, 0) + 1

if file_path not in self._author_cache:
self._author_cache[file_path] = set()
self._author_cache[file_path].add(commit.author.email)

self._cache_initialized = True
except Exception as e:
logger.error(f"Error initializing git stats: {e}")

def get_file_churn_score(self, file_path: str, days: int = 90) -> float:
"""计算文件变更频率评分(0-10)

算法: score = min(10, commit_count / (days / 30))
- 月均 1 次提交 = 1 分
- 月均 10 次提交 = 10 分(满分)
"""
if not self.repo:
return 0.0

# Ensure cache is populated
self._initialize_stats(days)

# We need exact path match.
# Note: git paths are relative to repo root. `file_path` usually is relative too.
# But we might need normalization if `file_path` comes from different source.
# Assuming consistency for now.

commit_count = self._churn_cache.get(file_path, 0)

denominator = max(days / 30, 1) # avoid division by zero
score = min(10.0, commit_count / denominator)
return round(score, 2)

def get_file_author_count(self, file_path: str) -> int:
"""统计文件的历史贡献者数量

用于评估"维护一致性风险":
- 1 人维护: 低风险(知识集中)
- 5+ 人维护: 高风险(理解成本高)
"""
if not self.repo:
return 0

self._initialize_stats()

authors = self._author_cache.get(file_path, set())
return len(authors)

def get_hotspot_files(self, top_n: int = 20) -> List[Dict]:
"""识别代码热点(高频变更文件)
"""
if not self.repo:
return []

self._initialize_stats()

sorted_files = sorted(self._churn_cache.items(), key=lambda x: x[1], reverse=True)[:top_n]

result = []
for path, count in sorted_files:
result.append({
"path": path,
"commits": count
})
return result
Loading
Loading