Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions codesectools/datasets/BenchmarkJava/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ class BenchmarkJava(PrebuiltFileDataset):
Attributes:
name (str): The name of the dataset, "BenchmarkJava".
supported_languages (list[str]): A list of supported programming languages.
license (str): The license under which the dataset is distributed.
license_url (str): A URL to the full text of the license.
build_command (str): The command to build the Java project.
prebuilt_expected (tuple): A tuple defining the path and glob pattern for expected build artifacts.
artefacts_arg (str): The argument to specify the location of build artifacts for SAST tools.

"""

Expand All @@ -64,6 +69,7 @@ class BenchmarkJava(PrebuiltFileDataset):

build_command = "mvn clean compile"
prebuilt_expected = (Path("target/classes/org/owasp/benchmark/testcode"), "*.class")
artefacts_arg = "."

def __init__(self, lang: None | str = None) -> None:
"""Initialize the BenchmarkJava dataset.
Expand Down
21 changes: 11 additions & 10 deletions codesectools/datasets/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,19 @@ def list_dataset_full_names(cls) -> list[str]:


class PrebuiltDatasetMixin:
"""Provide functionality for datasets that require a build step."""
"""Provide functionality for datasets that require a build step.

Attributes:
build_command (str): The command required to build the dataset.
prebuilt_expected (tuple[Path, str]): A tuple containing the path and glob pattern
to find the built artifacts.
artefacts_arg (str): The argument to pass to the SAST tool command template.

"""

build_command: str
prebuilt_expected: tuple[Path, str]
artefacts_arg: str

def is_built(self) -> bool:
"""Check if the dataset has been built."""
Expand Down Expand Up @@ -271,15 +280,7 @@ def save(self, dir: Path) -> None:


class FileDataset(Dataset):
"""Abstract base class for datasets composed of individual files.

Attributes:
directory (Path): The directory path for the dataset.
lang (str): The programming language of the dataset.
full_name (str): The full name of the dataset, including the language.
files (list[File]): A list of `File` objects loaded from the dataset.

"""
"""Abstract base class for datasets composed of individual files."""

def __init__(self, lang: str) -> None:
"""Initialize a FileDataset instance.
Expand Down
85 changes: 26 additions & 59 deletions codesectools/sasts/core/sast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from pathlib import Path
from typing import Any, Literal, Union

import git
from rich import print
from rich.panel import Panel
from rich.progress import Progress
Expand Down Expand Up @@ -123,9 +122,15 @@ def run_analysis(
render_variables[to_replace] = v
elif isinstance(v, Path):
render_variables[to_replace] = str(v.resolve())
elif isinstance(v, list):
render_variables[to_replace] = v
else:
raise NotImplementedError(k, v)

# Make temporary directory available to command
temp_dir = tempfile.TemporaryDirectory()
render_variables["{tempdir}"] = temp_dir.name

with Progress() as progress:
progress.add_task(
f"[b][{self.name}][/b] analyzing: [i]{project_dir.name}[/i]",
Expand Down Expand Up @@ -165,7 +170,7 @@ def save_results(self, project_dir: Path, output_dir: Path, extra: dict) -> None

"""
output_dir.mkdir(exist_ok=True, parents=True)
json.dump(extra, (output_dir / "cstools_output.json").open("w"))
json.dump(extra, (output_dir / "cstools_output.json").open("w"), indent=4)

missing_files = []
for path_from_root, required in self.output_files:
Expand All @@ -175,7 +180,7 @@ def save_results(self, project_dir: Path, output_dir: Path, extra: dict) -> None
filepath = project_dir / parent_dir / filename
if filepath.is_file():
if not filepath == output_dir / filename:
shutil.copy2(filepath, output_dir / filename)
filepath.rename(output_dir / filename)
else:
if required:
missing_files.append(filename)
Expand All @@ -184,7 +189,7 @@ def save_results(self, project_dir: Path, output_dir: Path, extra: dict) -> None
if filepaths:
for filepath in filepaths:
if not filepath == output_dir / filename:
shutil.copy2(filepath, output_dir / filepath.name)
filepath.rename(output_dir / filepath.name)
else:
if required:
missing_files.append(filename)
Expand Down Expand Up @@ -218,25 +223,7 @@ def analyze_files(
)
return

# Create temporary directory for the project
temp_dir = tempfile.TemporaryDirectory()
temp_path = Path(temp_dir.name)

# Copy files into the temporary directory
if testing:
random.seed(os.environ.get("CONSTANT_RANDOM", os.urandom(16)))
files = random.sample(dataset.files, k=2)
else:
files = dataset.files

for file in files:
file.save(temp_path)

# Run analysis
self.run_analysis(dataset.lang, temp_path, result_path)

# Clear temporary directory
temp_dir.cleanup()
self.run_analysis(dataset.lang, dataset.directory, result_path)

def analyze_repos(
self, dataset: GitRepoDataset, overwrite: bool = False, testing: bool = False
Expand All @@ -252,8 +239,8 @@ def analyze_repos(
testing: If True, run analysis on a sample of two small random repositories for testing purposes.

"""
base_result_path = self.output_dir / dataset.full_name
base_result_path.mkdir(exist_ok=True, parents=True)
result_path = self.output_dir / dataset.full_name
result_path.mkdir(exist_ok=True, parents=True)

if testing:
random.seed(os.environ.get("CONSTANT_RANDOM", os.urandom(16)))
Expand All @@ -263,27 +250,22 @@ def analyze_repos(
repos = dataset.repos

for repo in repos:
result_path = base_result_path / repo.name
if result_path.is_dir():
if list(result_path.iterdir()) and not overwrite:
repo_result_path = result_path / repo.name
if repo_result_path.is_dir():
if list(repo_result_path.iterdir()) and not overwrite:
print(f"Results already exist for {repo.name}, skipping...")
print("Please use --overwrite to analyze again")
continue

# Create temporary directory for the project
temp_dir = tempfile.TemporaryDirectory()
repo_path = Path(temp_dir.name)
repo_source_path = dataset.directory / repo.name

# Clone and checkout to the vulnerable commit
try:
repo.save(repo_path)
except git.GitCommandError:
continue
if repo_source_path.is_dir():
shutil.rmtree(repo_source_path)

# Run analysis
self.run_analysis(dataset.lang, repo_path, result_path)
repo_source_path.mkdir()
repo.save(repo_source_path)

# Clear temporary directory
temp_dir.cleanup()
self.run_analysis(dataset.lang, repo_source_path, repo_result_path)

@property
def supported_dataset_full_names(self) -> list[str]:
Expand Down Expand Up @@ -399,28 +381,13 @@ def analyze_files(
)
return

# Create temporary directory for the project
temp_dir = tempfile.TemporaryDirectory()
temp_path = Path(temp_dir.name)

# Copy files into the temporary directory
if testing:
random.seed(os.environ.get("CONSTANT_RANDOM", os.urandom(16)))
prebuilt_files = random.sample(dataset.list_prebuilt_files(), k=2)
else:
prebuilt_files = dataset.list_prebuilt_files()

for prebuilt_file in prebuilt_files:
shutil.copy2(prebuilt_file, temp_path / prebuilt_file.name)

# Run analysis
self.run_analysis(
dataset.lang, dataset.directory, result_path, artifacts=temp_path
dataset.lang,
dataset.directory,
result_path,
artifacts=dataset.artefacts_arg,
)

# Clear temporary directory
temp_dir.cleanup()


class PrebuiltBuildlessSAST(PrebuiltSAST, BuildlessSAST):
"""Represent a SAST tool that can analyze both source code and pre-built artifacts."""
Expand Down
3 changes: 3 additions & 0 deletions codesectools/sasts/tools/Cppcheck/sast.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
SASTRequirements,
)
from codesectools.sasts.tools.Cppcheck.parser import CppcheckAnalysisResult
from codesectools.utils import CPU_COUNT


class CppcheckSAST(PrebuiltBuildlessSAST):
Expand Down Expand Up @@ -50,6 +51,8 @@ class CppcheckSAST(PrebuiltBuildlessSAST):
"--enable=all",
"--xml",
"--output-file=cppcheck_output.xml",
"--cppcheck-build-dir={tempdir}",
f"-j{CPU_COUNT}",
]
]
valid_codes = [0]
Expand Down
59 changes: 46 additions & 13 deletions codesectools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import os
import re
import subprocess
from collections.abc import Sequence
from importlib.resources import files
Expand Down Expand Up @@ -39,31 +40,60 @@ def DEBUG() -> bool:


# Subprocess wrapper
def render_command(command: list[str], map: dict[str, str]) -> list[str]:
def get_pattern(arg: str, mapping: dict[str, str]) -> str | None:
"""Find a placeholder pattern like '{placeholder}' in an argument string.

Args:
arg: The string to search for a pattern.
mapping: A dictionary of placeholders, kept for contextual consistency
with `render_command`.

Returns:
The found pattern string (e.g., '{placeholder}') or None if not found.

"""
if m := re.search(r"\{.*\}", arg):
return m.group(0)


def render_command(command: list, mapping: dict[str, str]) -> list[str]:
"""Render a command template by replacing placeholders with values.

Substitutes placeholders in a command list from a given map. It handles
simple string arguments and optional arguments represented as tuples.
If a mapped value is a list, the argument is expanded.

Args:
command: The command template as a list of strings.
map: A dictionary of placeholders to their replacement values.
command: The command template, which can contain strings and tuples
of the form `(default, optional_template)`.
mapping: A dictionary of placeholders to their replacement values.

Returns:
The rendered command as a list of strings.

"""
_command = command.copy()
for pattern, value in map.items():
for i, arg in enumerate(_command):
# Check if optional argument can be used
if isinstance(arg, tuple):
default_arg, optional_arg = arg
if pattern in optional_arg:
_command[i] = arg.replace(pattern, value)
for i, arg in enumerate(_command):
# Check if optional argument can be used
if isinstance(arg, tuple):
default_arg, optional_arg = arg

if pattern := get_pattern(optional_arg, mapping):
_command[i] = optional_arg.replace(pattern, mapping[pattern])
elif pattern := get_pattern(default_arg, mapping):
_command[i] = default_arg.replace(pattern, mapping[pattern])
else:
if pattern := get_pattern(arg, mapping):
value = mapping[pattern]
if isinstance(value, list):
_command[i] = " ".join(
arg.replace(pattern, subvalue) for subvalue in value
)
else:
_command[i] = default_arg
else:
if pattern in arg:
_command[i] = arg.replace(pattern, value)

_command = " ".join(_command).split(" ")

# Remove not rendered part of the command:
__command = []
for part in _command:
Expand Down Expand Up @@ -193,3 +223,6 @@ def shorten_path(p: str) -> str:
if len(path.parts) > 3:
return str(Path("...") / path.parts[-2] / path.parts[-1])
return p


CPU_COUNT = os.cpu_count()