diff --git a/mitreattack/attackToExcel/attackToExcel.py b/mitreattack/attackToExcel/attackToExcel.py index 29db7e4c..f0fe7302 100644 --- a/mitreattack/attackToExcel/attackToExcel.py +++ b/mitreattack/attackToExcel/attackToExcel.py @@ -146,6 +146,31 @@ def build_dataframes(src: MemoryStore, domain: str) -> Dict: } return df +def build_ds_an_lg_relationships(dataframes: Dict) -> Dict[str, pd.DataFrame]: + """Build sheets for ds-an-lg.xlsx using existing relationship tables.""" + # Use existing DetectionStrategy -> Analytics relationship table + ds_an = dataframes["detectionstrategies"].get( + "detectionstrategies-analytic", + pd.DataFrame() + ) + + # Use existing Analytics -> LogSource relationship table + an_lg = dataframes["analytics"].get( + "analytic-logsource", + pd.DataFrame() + ) + + # Use existing Analytics -> Detection Strategy relationship table + an_ds = dataframes["analytics"].get( + "analytic-detectionstrategy", + pd.DataFrame() + ) + + return { + "detectionstrategy_to_analytics": ds_an, + "analytics_to_logsources": an_lg, + "analytics_to_detectionstrategy": an_ds + } def write_excel(dataframes: Dict, domain: str, version: Optional[str] = None, output_dir: str = ".") -> List: """Given a set of dataframes from build_dataframes, write the ATT&CK dataset to output directory. @@ -292,6 +317,18 @@ def write_excel(dataframes: Dict, domain: str, version: Optional[str] = None, ou ) written_files.append(master_fp) + + # Write Detection strategy - Analytics - Log sources file + ds_an_lg_frames = build_ds_an_lg_relationships(dataframes) + ds_an_lg_fp = os.path.join(output_directory, f"{domain_version_string}-detectionstrategy-anlytic-logsources.xlsx") + + with pd.ExcelWriter(ds_an_lg_fp) as rel_writer: + for sheet_name, df in ds_an_lg_frames.items(): + if not df.empty: + df.to_excel(rel_writer, sheet_name=sheet_name, index=False) + + written_files.append(ds_an_lg_fp) + for thefile in written_files: logger.info(f"Excel file created: {thefile}") return written_files diff --git a/mitreattack/attackToExcel/stixToDf.py b/mitreattack/attackToExcel/stixToDf.py index bf7290f6..4c1cd552 100644 --- a/mitreattack/attackToExcel/stixToDf.py +++ b/mitreattack/attackToExcel/stixToDf.py @@ -367,16 +367,64 @@ def analyticsToDf(src): analytics = src.query([Filter("type", "=", "x-mitre-analytic")]) analytics = remove_revoked_deprecated(analytics) + # Detection strategies (needed for analytics to detection strategies relationship) + detection_strategies = src.query([Filter("type", "=", "x-mitre-detection-strategy")]) + detection_strategies = remove_revoked_deprecated(detection_strategies) + dataframes = {} if analytics: analytic_rows = [] + logsource_rows = [] + analytic_to_ds_rows = [] + + # analytics to detection strategies + analytic_to_ds_map = {} + for ds in detection_strategies: + for analytic_id in ds.get("x_mitre_analytic_refs", []): + analytic_to_ds_map.setdefault(analytic_id, []).append({ + "detection_strategy_id": ds["id"], + "detection_strategy_name": ds.get("name", "") + }) + for analytic in tqdm(analytics, desc="parsing analytics"): analytic_rows.append(parseBaseStix(analytic)) + # log-source relationship table rows + for logsrc in analytic.get("x_mitre_log_source_references", []): + data_comp_id = logsrc.get("x_mitre_data_component_ref", "") + data_comp = src.get(data_comp_id) + data_comp_name = data_comp.get("name", "") if data_comp else "" + + logsource_rows.append({ + "analytic_id": analytic["id"], + "analytic_name": analytic["external_references"][0]["external_id"], + "data_component_id": data_comp_id, + "data_component_name": data_comp_name, + "log_source_name": logsrc.get("name", ""), + "channel": logsrc.get("channel", "") + }) + + # detection strategies relationship table rows + for ds_info in analytic_to_ds_map.get(analytic["id"], []): + analytic_to_ds_rows.append({ + "analytic_id": analytic["id"], + "analytic_name": analytic["external_references"][0]["external_id"], + "detection_strategy_id": ds_info["detection_strategy_id"], + "detection_strategy_name": ds_info["detection_strategy_name"] + }) + + citations = get_citations(analytics) + dataframes["analytics"] = ( + pd.DataFrame(analytic_rows).sort_values("name") + ) + dataframes["analytic-logsource"] = ( + pd.DataFrame(logsource_rows) + ) + dataframes["analytic-detectionstrategy"] = ( + pd.DataFrame(analytic_to_ds_rows) + ) + citations = get_citations(analytics) - dataframes = { - "analytics": pd.DataFrame(analytic_rows).sort_values("name"), - } if not citations.empty: dataframes["citations"] = citations.sort_values("reference") @@ -398,20 +446,40 @@ def detectionstrategiesToDf(src): dataframes = {} if detection_strategies: detection_strategy_rows = [] + rel_rows = [] for detection_strategy in tqdm(detection_strategies, desc="parsing detection strategies"): - detection_strategy_rows.append(parseBaseStix(detection_strategy)) + row = parseBaseStix(detection_strategy) + row["analytic_refs"] = ( + "; ".join(detection_strategy.get("x_mitre_analytic_refs", [])) + ) + detection_strategy_rows.append(row) + + # analytics relationship table rows + for analytic_id in detection_strategy.get("x_mitre_analytic_refs", []): + analytic_obj = src.get(analytic_id) + + rel_rows.append({ + "detection_strategy_id": detection_strategy["id"], + "detection_strategy_name": detection_strategy.get("name", ""), + "analytic_id": analytic_id, + "analytic_name": analytic_obj["external_references"][0]["external_id"], + }) + + # Build main dataframes + dataframes["detectionstrategies"] = ( + pd.DataFrame(detection_strategy_rows).sort_values("name") + ) + + dataframes["detectionstrategies-analytic"] = ( + pd.DataFrame(rel_rows) + ) citations = get_citations(detection_strategies) - dataframes = { - "detectionstrategies": pd.DataFrame(detection_strategy_rows).sort_values("name"), - } if not citations.empty: - if "citations" in dataframes: # append to existing citations from references - dataframes["citations"] = citations.sort_values("reference") + dataframes["citations"] = citations.sort_values("reference") else: logger.warning("No detection strategies found - nothing to parse") - return dataframes @@ -956,7 +1024,6 @@ def matricesToDf(src, domain): # end adding of matrices return matrices_parsed, sub_matrices_parsed - def relationshipsToDf(src, relatedType=None): """Parse STIX relationships from the given data and return corresponding pandas dataframes.