Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion codesage/config/risk_baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,34 @@
class RiskBaselineConfig(BaseModel):
"""Configuration for the baseline risk scorer."""

# Weights for risk scoring
# Weights for risk scoring (Base static score)
weight_complexity_max: float = 0.4
weight_complexity_avg: float = 0.3
weight_fan_out: float = 0.2
weight_loc: float = 0.1

# Weights for multi-dimensional scoring
# Final = w_static * static + w_churn * churn + w_cov * (static * (1-cov))
# Or as per task: Score = w1 * Complexity + w2 * Churn + w3 * (1 - Coverage)
# The "Complexity" here refers to the static score calculated above.

weight_static_score: float = 0.5
weight_churn: float = 0.3
weight_coverage_penalty: float = 0.2

# Propagation
propagation_factor: float = 0.2
propagation_iterations: int = 5

# Thresholds for complexity and risk levels
threshold_complexity_high: int = 10
threshold_risk_medium: float = 0.4
threshold_risk_high: float = 0.7

# Churn settings
churn_since_days: int = 90
threshold_churn_high: int = 10 # If file changed > 10 times in 90 days, normalized churn = 1.0

@classmethod
def from_defaults(cls) -> "RiskBaselineConfig":
return cls()
99 changes: 99 additions & 0 deletions codesage/history/git_miner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import subprocess
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional
import logging

logger = logging.getLogger(__name__)

class GitMiner:
def __init__(self, repo_path: str = "."):
self.repo_path = repo_path
self._churn_cache: Dict[str, int] = {}
self._last_modified_cache: Dict[str, datetime] = {}
self._is_initialized = False

def _run_git_cmd(self, args: List[str]) -> str:
try:
result = subprocess.run(
["git"] + args,
cwd=self.repo_path,
capture_output=True,
text=True,
check=True
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
logger.warning(f"Git command failed: {e}")
return ""

def _initialize_stats(self, since_days: int = 90):
"""
Parses git log once to populate churn and last modified dates.
"""
if self._is_initialized:
return

since_date = (datetime.now() - timedelta(days=since_days)).strftime("%Y-%m-%d")

# Get all commits with file changes
# Format: timestamp|filename
cmd = [
"log",
f"--since={since_date}",
"--pretty=format:%at", # Timestamp
"--name-only", # List changed files
]

output = self._run_git_cmd(cmd)

current_timestamp = None

for line in output.split('\n'):
line = line.strip()
if not line:
continue

# If line is a timestamp (digits)
if line.isdigit():
current_timestamp = int(line)
continue

# Otherwise it's a filename
file_path = line
self._churn_cache[file_path] = self._churn_cache.get(file_path, 0) + 1

if current_timestamp:
dt = datetime.fromtimestamp(current_timestamp)
if file_path not in self._last_modified_cache:
self._last_modified_cache[file_path] = dt
else:
# git log is usually newest first, so we keep the first one we see (max)
# or if we process in order, the first one is indeed the latest.
# Wait, git log default is reverse chronological (newest first).
# So the first time we see a file, it's the latest commit.
# We only set it if not present.
pass

self._is_initialized = True

def get_file_churn(self, file_path: str, since_days: int = 90) -> int:
"""
Returns the number of times a file has been changed in the last `since_days`.
"""
self._initialize_stats(since_days)
return self._churn_cache.get(file_path, 0)

def get_last_modified(self, file_path: str) -> Optional[datetime]:
"""
Returns the last modification time of the file from git history.
"""
self._initialize_stats() # Use default since_days or make sure we have data
return self._last_modified_cache.get(file_path)

def get_hotspots(self, limit: int = 10, since_days: int = 90) -> List[Tuple[str, int]]:
"""
Returns the top `limit` modified files.
"""
self._initialize_stats(since_days)
sorted_files = sorted(self._churn_cache.items(), key=lambda x: x[1], reverse=True)
return sorted_files[:limit]
62 changes: 62 additions & 0 deletions codesage/risk/propagation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from typing import Dict, List, Set, Tuple
import logging

logger = logging.getLogger(__name__)

class RiskPropagator:
def __init__(self, attenuation_factor: float = 0.5, max_iterations: int = 10, epsilon: float = 0.01):
self.attenuation_factor = attenuation_factor
self.max_iterations = max_iterations
self.epsilon = epsilon

def propagate(self, dependency_graph: Dict[str, List[str]], base_scores: Dict[str, float]) -> Dict[str, float]:
"""
Propagates risk scores through the dependency graph.
dependency_graph: Dict[str, List[str]] where key is a file and value is a list of files it depends on (imports).
base_scores: Dict[str, float] initial risk scores for each file.

If A depends on B (A -> B), then risk flows from B to A.
"Calling a high risk component makes you risky."
"""

final_scores = base_scores.copy()

# Build reverse graph: who depends on X? (X -> [A, ...])
# Wait, if A depends on B, risk propagates B -> A.
# So we iterate through nodes. For a node A, we look at its dependencies (B, C).
# A's new score = A's base score + sum(B's score * factor)

# However, B's score might also increase if B depends on D.
# So this is an iterative process.

nodes = list(base_scores.keys())

for _ in range(self.max_iterations):
changes = 0
current_scores = final_scores.copy()

for node in nodes:
# dependencies: files that 'node' imports
dependencies = dependency_graph.get(node, [])

incoming_risk = 0.0
for dep in dependencies:
if dep in current_scores:
incoming_risk += current_scores[dep] * self.attenuation_factor

# Formula: Base + Propagated
# We should probably dampen it so it doesn't explode, or clamp it?
# The user formula says: new_score = base_scores[node] + incoming_risk
# If we want 0-100 or 0-1 scale, this might exceed 1.0.
# But that's fine, we can normalize later or cap it.

new_score = base_scores[node] + incoming_risk

if abs(new_score - final_scores[node]) > self.epsilon:
final_scores[node] = new_score
changes += 1

if changes == 0:
break

return final_scores
Loading
Loading