From 26771bf42e51121bfdbd118d8707583486103dd5 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 6 Jan 2025 13:11:04 +0000 Subject: [PATCH 1/3] Added logic to 'suggest_path' API endpoint to check for the current visit under the previous year in the event the year rolls over during a data collection session --- src/murfey/server/api/__init__.py | 24 ++++++++++++++++++++++++ src/murfey/server/demo_api.py | 27 +++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index 0efb50066..b8fb35109 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -1285,7 +1285,31 @@ def suggest_path( raise ValueError( "No machine configuration set when suggesting destination path" ) + + # Construct the full path to where the dataset is to be saved check_path = machine_config.rsync_basepath / base_path + + # Check previous year to account for the year rolling over during data collection + if not check_path.exists(): + base_path_parts = base_path.split("/") + for part in base_path_parts: + # Find the path part corresponding to the year + if len(part) == 4 and part.isdigit(): + year_idx = base_path_parts.index(part) + base_path_parts[year_idx] = str(int(part) - 1) + base_path = "/".join(base_path_parts) + check_path_prev = check_path + check_path = machine_config.rsync_basepath / base_path + + # If it's not in the previous year either, it's a genuine error + if not check_path.exists(): + log_message = ( + "Unable to find current visit folder under " + f"{str(check_path_prev)!r} or {str(check_path)!r}" + ) + log.error(log_message) + raise FileNotFoundError(log_message) + check_path_name = check_path.name while check_path.exists(): count = count + 1 if count else 2 diff --git a/src/murfey/server/demo_api.py b/src/murfey/server/demo_api.py index eab41858a..f1023dd84 100644 --- a/src/murfey/server/demo_api.py +++ b/src/murfey/server/demo_api.py @@ -1296,6 +1296,33 @@ def suggest_path( else Path(f"/dls/{get_microscope()}") / params.base_path ) check_path = check_path.parent / f"{check_path.stem}{count}{check_path.suffix}" + + # Check previous year to account for the year rolling over during data collection + if not sanitise_path(check_path).exists(): + base_path_parts = list(params.base_path.parts) + for part in base_path_parts: + # Find the path part corresponding to the year + if len(part) == 4 and part.isdigit(): + year_idx = base_path_parts.index(part) + base_path_parts[year_idx] = str(int(part) - 1) + base_path = "/".join(base_path_parts) + check_path_prev = check_path + check_path = ( + machine_config[instrument_name].rsync_basepath / base_path + if machine_config + else Path(f"/dls/{get_microscope()}") / base_path + ) + check_path = check_path.parent / f"{check_path.stem}{count}{check_path.suffix}" + + # If visit is not in the previous year either, it's a genuine error + if not check_path.exists(): + log_message = ( + "Unable to find current visit folder under " + f"{str(check_path_prev)!r} or {str(check_path)!r}" + ) + log.error(log_message) + raise FileNotFoundError(log_message) + check_path_name = check_path.name while sanitise_path(check_path).exists(): count = count + 1 if count else 2 From 97e9757558e387191b6d72125ee0e4f7ae478d1a Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 6 Jan 2025 13:23:49 +0000 Subject: [PATCH 2/3] Added similar year-finding logic to the 'process_gain' API endpoint --- src/murfey/server/api/__init__.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index b8fb35109..34afd1e96 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -1502,6 +1502,26 @@ async def process_gain( / secure_filename(visit_name) / machine_config.gain_directory_name ) + + # Check under previous year if the folder doesn't exist + if not filepath.exists(): + filepath_prev = filepath + filepath = ( + Path(machine_config.rsync_basepath) + / (machine_config.rsync_module or "data") + / str(datetime.datetime.now().year - 1) + / secure_filename(visit_name) + / machine_config.gain_directory_name + ) + # If it's not in the previous year, it's a genuine error + if not filepath.exists(): + log_message = ( + "Unable to find gain reference directory under " + f"{str(filepath_prev)!r} or {str(filepath)}" + ) + log.error(log_message) + raise FileNotFoundError(log_message) + if gain_reference_params.eer: new_gain_ref, new_gain_ref_superres = await prepare_eer_gain( filepath / safe_path_name, From 4635a6724531a45b9e73782001011a29054f04ad Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 6 Jan 2025 13:52:45 +0000 Subject: [PATCH 3/3] Resolved CodeQL warnings about log injection and path traversal --- src/murfey/server/demo_api.py | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/murfey/server/demo_api.py b/src/murfey/server/demo_api.py index f1023dd84..fd20bac56 100644 --- a/src/murfey/server/demo_api.py +++ b/src/murfey/server/demo_api.py @@ -110,7 +110,7 @@ class Settings(BaseSettings): settings = Settings() -machine_config: dict = {} +machine_config: dict[str, MachineConfig] = {} if settings.murfey_machine_configuration: microscope = get_microscope() machine_config = from_file(Path(settings.murfey_machine_configuration), microscope) @@ -1290,12 +1290,18 @@ def suggest_path( instrument_name = ( db.exec(select(Session).where(Session.id == session_id)).one().instrument_name ) - check_path = ( - machine_config[instrument_name].rsync_basepath / params.base_path + rsync_basepath = ( + machine_config[instrument_name].rsync_basepath if machine_config - else Path(f"/dls/{get_microscope()}") / params.base_path + else Path(f"/dls/{get_microscope()}") ) + check_path = rsync_basepath / params.base_path check_path = check_path.parent / f"{check_path.stem}{count}{check_path.suffix}" + check_path = check_path.resolve() + + # Check for path traversal attempt + if not str(check_path).startswith(str(rsync_basepath)): + raise Exception(f"Path traversal attempt detected: {str(check_path)!r}") # Check previous year to account for the year rolling over during data collection if not sanitise_path(check_path).exists(): @@ -1307,16 +1313,17 @@ def suggest_path( base_path_parts[year_idx] = str(int(part) - 1) base_path = "/".join(base_path_parts) check_path_prev = check_path - check_path = ( - machine_config[instrument_name].rsync_basepath / base_path - if machine_config - else Path(f"/dls/{get_microscope()}") / base_path - ) + check_path = rsync_basepath / base_path check_path = check_path.parent / f"{check_path.stem}{count}{check_path.suffix}" + check_path = check_path.resolve() + + # Check for path traversal attempt + if not str(check_path).startswith(str(rsync_basepath)): + raise Exception(f"Path traversal attempt detected: {str(check_path)!r}") # If visit is not in the previous year either, it's a genuine error if not check_path.exists(): - log_message = ( + log_message = sanitise( "Unable to find current visit folder under " f"{str(check_path_prev)!r} or {str(check_path)!r}" )