Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions mitreattack/attackToExcel/attackToExcel.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,31 @@ def build_dataframes(src: MemoryStore, domain: str) -> Dict:
}
return df

def build_ds_an_lg_relationships(dataframes: Dict) -> Dict[str, pd.DataFrame]:
"""Build sheets for ds-an-lg.xlsx using existing relationship tables."""
# Use existing DetectionStrategy -> Analytics relationship table
ds_an = dataframes["detectionstrategies"].get(
"detectionstrategies-analytic",
pd.DataFrame()
)

# Use existing Analytics -> LogSource relationship table
an_lg = dataframes["analytics"].get(
"analytic-logsource",
pd.DataFrame()
)

# Use existing Analytics -> Detection Strategy relationship table
an_ds = dataframes["analytics"].get(
"analytic-detectionstrategy",
pd.DataFrame()
)

return {
"detectionstrategy_to_analytics": ds_an,
"analytics_to_logsources": an_lg,
"analytics_to_detectionstrategy": an_ds
}

def write_excel(dataframes: Dict, domain: str, version: Optional[str] = None, output_dir: str = ".") -> List:
"""Given a set of dataframes from build_dataframes, write the ATT&CK dataset to output directory.
Expand Down Expand Up @@ -292,6 +317,18 @@ def write_excel(dataframes: Dict, domain: str, version: Optional[str] = None, ou
)

written_files.append(master_fp)

# Write Detection strategy - Analytics - Log sources file
ds_an_lg_frames = build_ds_an_lg_relationships(dataframes)
ds_an_lg_fp = os.path.join(output_directory, f"{domain_version_string}-detectionstrategy-anlytic-logsources.xlsx")

with pd.ExcelWriter(ds_an_lg_fp) as rel_writer:
for sheet_name, df in ds_an_lg_frames.items():
if not df.empty:
df.to_excel(rel_writer, sheet_name=sheet_name, index=False)

written_files.append(ds_an_lg_fp)

for thefile in written_files:
logger.info(f"Excel file created: {thefile}")
return written_files
Expand Down
89 changes: 78 additions & 11 deletions mitreattack/attackToExcel/stixToDf.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@
return dataframes


def analyticsToDf(src):

Check failure on line 361 in mitreattack/attackToExcel/stixToDf.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 21 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=mitre-attack_mitreattack-python&issues=AZr_NtGJ4aChBanh-A3t&open=AZr_NtGJ4aChBanh-A3t&pullRequest=217
"""Parse STIX Analytics from the given data and return corresponding pandas dataframes.

:param src: MemoryStore or other stix2 DataSource object holding the domain data
Expand All @@ -367,16 +367,64 @@
analytics = src.query([Filter("type", "=", "x-mitre-analytic")])
analytics = remove_revoked_deprecated(analytics)

# Detection strategies (needed for analytics to detection strategies relationship)
detection_strategies = src.query([Filter("type", "=", "x-mitre-detection-strategy")])
detection_strategies = remove_revoked_deprecated(detection_strategies)

dataframes = {}
if analytics:
analytic_rows = []
logsource_rows = []
analytic_to_ds_rows = []

# analytics to detection strategies
analytic_to_ds_map = {}
for ds in detection_strategies:
for analytic_id in ds.get("x_mitre_analytic_refs", []):
analytic_to_ds_map.setdefault(analytic_id, []).append({
"detection_strategy_id": ds["id"],
"detection_strategy_name": ds.get("name", "")
})

for analytic in tqdm(analytics, desc="parsing analytics"):
analytic_rows.append(parseBaseStix(analytic))

# log-source relationship table rows
for logsrc in analytic.get("x_mitre_log_source_references", []):
data_comp_id = logsrc.get("x_mitre_data_component_ref", "")
data_comp = src.get(data_comp_id)
data_comp_name = data_comp.get("name", "") if data_comp else ""

logsource_rows.append({
"analytic_id": analytic["id"],
"analytic_name": analytic["external_references"][0]["external_id"],
"data_component_id": data_comp_id,
"data_component_name": data_comp_name,
"log_source_name": logsrc.get("name", ""),
"channel": logsrc.get("channel", "")
})

# detection strategies relationship table rows
for ds_info in analytic_to_ds_map.get(analytic["id"], []):
analytic_to_ds_rows.append({
"analytic_id": analytic["id"],
"analytic_name": analytic["external_references"][0]["external_id"],
"detection_strategy_id": ds_info["detection_strategy_id"],
"detection_strategy_name": ds_info["detection_strategy_name"]
})

citations = get_citations(analytics)

Check warning on line 416 in mitreattack/attackToExcel/stixToDf.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this assignment to local variable 'citations'; the value is never used.

See more on https://sonarcloud.io/project/issues?id=mitre-attack_mitreattack-python&issues=AZr_NtGJ4aChBanh-A3s&open=AZr_NtGJ4aChBanh-A3s&pullRequest=217
dataframes["analytics"] = (
pd.DataFrame(analytic_rows).sort_values("name")
)
dataframes["analytic-logsource"] = (
pd.DataFrame(logsource_rows)
)
dataframes["analytic-detectionstrategy"] = (
pd.DataFrame(analytic_to_ds_rows)
)

citations = get_citations(analytics)
dataframes = {
"analytics": pd.DataFrame(analytic_rows).sort_values("name"),
}
if not citations.empty:
dataframes["citations"] = citations.sort_values("reference")

Expand All @@ -398,20 +446,40 @@
dataframes = {}
if detection_strategies:
detection_strategy_rows = []
rel_rows = []
for detection_strategy in tqdm(detection_strategies, desc="parsing detection strategies"):
detection_strategy_rows.append(parseBaseStix(detection_strategy))
row = parseBaseStix(detection_strategy)
row["analytic_refs"] = (
"; ".join(detection_strategy.get("x_mitre_analytic_refs", []))
)
detection_strategy_rows.append(row)

# analytics relationship table rows
for analytic_id in detection_strategy.get("x_mitre_analytic_refs", []):
analytic_obj = src.get(analytic_id)

rel_rows.append({
"detection_strategy_id": detection_strategy["id"],
"detection_strategy_name": detection_strategy.get("name", ""),
"analytic_id": analytic_id,
"analytic_name": analytic_obj["external_references"][0]["external_id"],
})

# Build main dataframes
dataframes["detectionstrategies"] = (
pd.DataFrame(detection_strategy_rows).sort_values("name")
)

dataframes["detectionstrategies-analytic"] = (
pd.DataFrame(rel_rows)
)

citations = get_citations(detection_strategies)
dataframes = {
"detectionstrategies": pd.DataFrame(detection_strategy_rows).sort_values("name"),
}
if not citations.empty:
if "citations" in dataframes: # append to existing citations from references
dataframes["citations"] = citations.sort_values("reference")
dataframes["citations"] = citations.sort_values("reference")

else:
logger.warning("No detection strategies found - nothing to parse")

return dataframes


Expand Down Expand Up @@ -956,7 +1024,6 @@
# end adding of matrices
return matrices_parsed, sub_matrices_parsed


def relationshipsToDf(src, relatedType=None):
"""Parse STIX relationships from the given data and return corresponding pandas dataframes.

Expand Down
Loading