Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 48 additions & 61 deletions codesage/cli/commands/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def _create_snapshot_data(path, project_name):
@snapshot.command('create')
@click.argument('path', type=click.Path(exists=True, dir_okay=True))
@click.option('--project', '-p', 'project_name_override', help='Override the project name.')
@click.option('--format', '-f', type=click.Choice(['json', 'python-semantic-digest', 'go-semantic-digest']), default='json', help='Snapshot format.')
@click.option('--format', '-f', type=click.Choice(['yaml', 'json', 'md']), default='yaml', help='Snapshot format.')
@click.option('--output', '-o', type=click.Path(), default=None, help='Output file path.')
@click.option('--compress', is_flag=True, help='Enable compression.')
@click.option('--language', '-l', type=click.Choice(['python', 'go', 'shell', 'java', 'auto']), default='auto', help='Language to analyze.')
Expand All @@ -110,82 +110,69 @@ def create(ctx, path, project_name_override, format, output, compress, language)
try:
root_path = Path(path)

if format in ['python-semantic-digest', 'go-semantic-digest']:
if output is None:
output = f"{root_path.name}_{language}_semantic_digest.yaml"
if language == 'auto':
if list(root_path.rglob("*.py")):
language = "python"
elif list(root_path.rglob("*.go")):
language = "go"
elif list(root_path.rglob("*.java")):
language = "java"
elif list(root_path.rglob("*.sh")):
language = "shell"
else:
click.echo("Could not auto-detect language.", err=True)
return

if language in ['python', 'go']:
config = SnapshotConfig()
builder = None

if language == 'auto':
if format == 'python-semantic-digest':
language = 'python'
elif format == 'go-semantic-digest':
language = 'go'
else:
# Fallback for auto-detection if format doesn't imply language
if list(root_path.rglob("*.py")):
language = "python"
elif list(root_path.rglob("*.go")):
language = "go"
elif list(root_path.rglob("*.java")):
language = "java"
elif list(root_path.rglob("*.sh")):
language = "shell"
else:
click.echo("Could not auto-detect language.", err=True)
return

if language == 'python' and format == 'python-semantic-digest':
if language == 'python':
builder = PythonSemanticSnapshotBuilder(root_path, config)
elif language == 'go' and format == 'go-semantic-digest':
else: # language == 'go'
builder = GoSemanticSnapshotBuilder(root_path, config)
# Preserve other language builders for future use, but they won't be triggered
# by the current format options.
elif language == 'shell':
builder = ShellSemanticSnapshotBuilder(root_path, config)
elif language == 'java':
builder = JavaSemanticSnapshotBuilder(root_path, config)
else:
click.echo(f"Unsupported language/format combination: {language}/{format}", err=True)
return

project_snapshot = builder.build()

generator = YAMLGenerator()
generator.export(project_snapshot, Path(output))

click.echo(f"{language.capitalize()} semantic digest created at {output}")
return

snapshot_data = _create_snapshot_data(path, project_name)
if output is None:
output = f"{root_path.name}_snapshot.{format}"

if output:
output_path = Path(output)
output_path.parent.mkdir(parents=True, exist_ok=True)
# Use model_dump_json for consistency
with open(output_path, 'w', encoding='utf-8') as f:
f.write(snapshot_data.model_dump_json(indent=2))

click.echo(f"Snapshot created at {output}")
else:
manager = SnapshotVersionManager(SNAPSHOT_DIR, project_name, DEFAULT_SNAPSHOT_CONFIG['snapshot'])
if format == 'yaml':
generator = YAMLGenerator()
generator.export(project_snapshot, output_path)
elif format == 'json':
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(project_snapshot, f, indent=2)
elif format == 'md':
click.echo("Markdown format is not yet implemented.", err=True)
return

# The format for saving via manager is 'json', not the input format for semantic digests
save_format = 'json'
click.echo(f"Snapshot created at {output}")

if compress:
snapshot_path = manager.save_snapshot(snapshot_data, save_format)
else: # Fallback to original snapshot logic for other languages
snapshot_data = _create_snapshot_data(path, project_name)

# Compress the file
with open(snapshot_path, 'rb') as f_in:
with gzip.open(f"{snapshot_path}.gz", 'wb') as f_out:
f_out.writelines(f_in)
os.remove(snapshot_path)
click.echo(f"Compressed snapshot created at {snapshot_path}.gz")
if output:
output_path = Path(output)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(snapshot_data.model_dump_json(indent=2))
click.echo(f"Snapshot created at {output}")
else:
snapshot_path = manager.save_snapshot(snapshot_data, save_format)
click.echo(f"Snapshot created at {snapshot_path}")
manager = SnapshotVersionManager(SNAPSHOT_DIR, project_name, DEFAULT_SNAPSHOT_CONFIG['snapshot'])
save_format = 'json'
if compress:
snapshot_path = manager.save_snapshot(snapshot_data, save_format)
with open(snapshot_path, 'rb') as f_in:
with gzip.open(f"{snapshot_path}.gz", 'wb') as f_out:
f_out.writelines(f_in)
os.remove(snapshot_path)
click.echo(f"Compressed snapshot created at {snapshot_path}.gz")
else:
snapshot_path = manager.save_snapshot(snapshot_data, save_format)
click.echo(f"Snapshot created at {snapshot_path}")
finally:
audit_logger.log(
AuditEvent(
Expand Down
70 changes: 38 additions & 32 deletions codesage/semantic_digest/go_snapshot_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@
if len(f.Names) == 0 {
ps = append(ps, typeStr)
} else {
for _, name := range f.Names {
ps = append(ps, name.Name+" "+typeStr)
for range f.Names {
ps = append(ps, typeStr) // 简化:只存类型,省 token
}
}
}
Expand Down Expand Up @@ -264,25 +264,41 @@
"""

class GoSemanticSnapshotBuilder(BaseLanguageSnapshotBuilder):
def build(self) -> Dict[str, Any]:
has_go = False
_parser_bin_path = None
_temp_dir = None

def __init__(self, root_path: Path, config: SnapshotConfig):
super().__init__(root_path, config)
self._setup_parser()

def _setup_parser(self):
if GoSemanticSnapshotBuilder._parser_bin_path:
return

try:
subprocess.check_call(["go", "version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
has_go = True
GoSemanticSnapshotBuilder._temp_dir = tempfile.TemporaryDirectory()
parser_src_path = os.path.join(GoSemanticSnapshotBuilder._temp_dir.name, "parser.go")
with open(parser_src_path, "w", encoding="utf-8") as f:
f.write(GO_AST_PARSER_SRC)

parser_bin_path = os.path.join(GoSemanticSnapshotBuilder._temp_dir.name, "parser")
subprocess.run(["go", "build", "-o", parser_bin_path, parser_src_path], capture_output=True, text=True, check=True)
GoSemanticSnapshotBuilder._parser_bin_path = parser_bin_path
except (subprocess.CalledProcessError, FileNotFoundError):
pass
GoSemanticSnapshotBuilder._parser_bin_path = None

def build(self) -> Dict[str, Any]:
digest = {
"root": self.root_path.name, "pkgs": {}, "graph": {}, "meta": {}
}

pkg_map = defaultdict(list)
all_files = self._collect_files()
total_cx = 0
total_err_checks = 0

for fpath in all_files:
data = self._extract_semantics(fpath, has_go)
data = self._extract_semantics(fpath)
pkg_name = data.get("pk", "unknown")
clean_data = {k: v for k, v in data.items() if v}
clean_data["f"] = str(fpath.relative_to(self.root_path))
Expand All @@ -295,9 +311,6 @@ def build(self) -> Dict[str, Any]:
"er": data["stat"].get("er", 0),
}

if "fn" in data:
total_cx += sum(fn.get("cx", 1) for fn in data["fn"])

pkg_map[pkg_name].append(clean_data)

deps = {imp for imp in data.get("im", []) if "." in imp}
Expand All @@ -315,34 +328,27 @@ def build(self) -> Dict[str, Any]:

digest["meta"] = {
"files": len(all_files), "pkgs": len(pkg_map),
"total_complexity": total_cx, "error_hotspots": total_err_checks,
"strategy": "AST" if has_go else "Regex"
"error_hotspots": total_err_checks,
"strategy": "AST" if GoSemanticSnapshotBuilder._parser_bin_path else "Regex"
}

return digest

def _collect_files(self) -> List[Path]:
return list(self.root_path.rglob("*.go"))

def _extract_semantics(self, file_path: Path, has_go: bool) -> Dict[str, Any]:
if has_go:
with tempfile.TemporaryDirectory() as temp_dir:
parser_src_path = os.path.join(temp_dir, "parser.go")
with open(parser_src_path, "w", encoding="utf-8") as f:
f.write(GO_AST_PARSER_SRC)

parser_bin_path = os.path.join(temp_dir, "parser")
try:
build_result = subprocess.run(["go", "build", "-o", parser_bin_path, parser_src_path], capture_output=True, text=True, check=True)
cmd = [parser_bin_path, str(file_path)]
output = subprocess.check_output(cmd, stderr=subprocess.PIPE, timeout=15)
return json.loads(output.decode('utf-8'))
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, json.JSONDecodeError) as e:
print(f"AST parsing failed for {file_path}: {e}")
if isinstance(e, subprocess.CalledProcessError):
print(f"Stderr: {e.stderr}")
if hasattr(e, 'stdout'):
print(f"Stdout: {e.stdout}")
def _extract_semantics(self, file_path: Path) -> Dict[str, Any]:
if GoSemanticSnapshotBuilder._parser_bin_path:
try:
cmd = [GoSemanticSnapshotBuilder._parser_bin_path, str(file_path)]
output = subprocess.check_output(cmd, stderr=subprocess.PIPE, timeout=15)
return json.loads(output.decode('utf-8'))
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, json.JSONDecodeError) as e:
print(f"AST parsing failed for {file_path}: {e}")
if isinstance(e, subprocess.CalledProcessError):
print(f"Stderr: {e.stderr}")
if hasattr(e, 'stdout'):
print(f"Stdout: {e.stdout}")

# Fallback to regex
content = file_path.read_text(encoding="utf-8", errors="ignore")
Expand Down
11 changes: 11 additions & 0 deletions codesage/semantic_digest/python_snapshot_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ def build(self) -> Dict[str, Any]:
digest["deps"][module_name].add(module)

self._finalize_digest(digest, total_ccn, all_imports)

# Convert defaultdicts to dicts for clean output
final_modules = {}
for name, data in digest["modules"].items():
data["fim"] = dict(data["fim"])
data["dc"] = sorted(list(data["dc"]))
final_modules[name] = data
digest["modules"] = final_modules
digest["deps"] = {mod: sorted(list(deps)) for mod, deps in digest["deps"].items()}


return digest

def _collect_files(self) -> List[Path]:
Expand Down
45 changes: 20 additions & 25 deletions codesage/snapshot/versioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,31 +83,26 @@ def _update_index(self, snapshot_path: str, metadata: SnapshotMetadata):
self._save_index(index)

def _get_expired_snapshots(self, index: List[Dict[str, Any]], now: datetime) -> List[Dict[str, Any]]:
"""Identifies expired snapshots based on retention policies."""

def parse_timestamp(ts_str):
ts = datetime.fromisoformat(ts_str)
if ts.tzinfo is None:
return ts.replace(tzinfo=timezone.utc)
return ts

try:
sorted_snapshots = sorted(
index,
key=lambda s: parse_timestamp(s["timestamp"]),
reverse=True
)
except (ValueError, TypeError):
return []

kept_snapshots = sorted_snapshots[:self.max_versions]

kept_by_date = {
s['version'] for s in kept_snapshots
if (now - parse_timestamp(s["timestamp"])) <= timedelta(days=self.retention_days)
}

return [s for s in index if s["version"] not in kept_by_date]
"""Identifies expired snapshots."""
valid_snapshots = []
for s in index:
try:
ts = datetime.fromisoformat(s["timestamp"])
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
if now - ts <= timedelta(days=self.retention_days):
valid_snapshots.append(s)
except ValueError:
# Skip malformed timestamps
continue

if len(valid_snapshots) > self.max_versions:
valid_snapshots = sorted(
valid_snapshots, key=lambda s: s["timestamp"], reverse=True
)[:self.max_versions]

valid_versions = {s["version"] for s in valid_snapshots}
return [s for s in index if s["version"] not in valid_versions]

def cleanup_expired_snapshots(self) -> int:
"""Removes expired snapshots and returns the count of deleted files."""
Expand Down
8 changes: 0 additions & 8 deletions go_test_codesage.yaml

This file was deleted.

8 changes: 0 additions & 8 deletions go_test_script.yaml

This file was deleted.

Loading
Loading