From ebfc163ccb1e99a5148ec85cd5bef4148eb395fc Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 26 Nov 2024 12:00:34 +0000 Subject: [PATCH 01/12] Added entry point to request for align-and-merge processing job for CLEM workflow --- pyproject.toml | 1 + src/murfey/workflows/clem/align_and_merge.py | 8 ++++++++ 2 files changed, 9 insertions(+) create mode 100644 src/murfey/workflows/clem/align_and_merge.py diff --git a/pyproject.toml b/pyproject.toml index 5f8295125..f92d3eb01 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -98,6 +98,7 @@ murfey = "murfey.client:run" [project.entry-points."murfey.config.extraction"] "murfey_machine" = "murfey.util.config:get_extended_machine_config" [project.entry-points."murfey.workflows"] +"clem.align_and_merge" = "murfey.workflows.clem.align_and_merge:submit_cluster_request" "process_raw_lifs" = "murfey.workflows.clem.process_raw_lifs:zocalo_cluster_request" "process_raw_tiffs" = "murfey.workflows.clem.process_raw_tiffs:zocalo_cluster_request" "register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result" diff --git a/src/murfey/workflows/clem/align_and_merge.py b/src/murfey/workflows/clem/align_and_merge.py new file mode 100644 index 000000000..1cf038b92 --- /dev/null +++ b/src/murfey/workflows/clem/align_and_merge.py @@ -0,0 +1,8 @@ +""" +Script to allow Murfey to request for an image alignment, colorisation, and merge job +from cryoemservices. +""" + + +def submit_cluster_request(): + return True From 772992717e7e571888123f891cc75e72de7d8351 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 26 Nov 2024 15:45:55 +0000 Subject: [PATCH 02/12] Added function to submit cluster align-and-merge processing request to cryoemservices --- src/murfey/workflows/clem/align_and_merge.py | 73 +++++++++++++++++++- 1 file changed, 72 insertions(+), 1 deletion(-) diff --git a/src/murfey/workflows/clem/align_and_merge.py b/src/murfey/workflows/clem/align_and_merge.py index 1cf038b92..dbbc44c2c 100644 --- a/src/murfey/workflows/clem/align_and_merge.py +++ b/src/murfey/workflows/clem/align_and_merge.py @@ -3,6 +3,77 @@ from cryoemservices. """ +from __future__ import annotations -def submit_cluster_request(): +from pathlib import Path +from typing import Literal, Optional + +from murfey.util.config import get_machine_config + +try: + from murfey.server.ispyb import TransportManager # Session +except AttributeError: + pass # Ignore if ISPyB credentials environment variable not set + + +def submit_cluster_request( + # Session parameters + session_id: int, + instrument_name: str, + # Processing parameters + series_name: str, + images: list[Path], + metadata: Path, + # Optional processing parameters + align_self: Optional[str] = None, + flatten: Optional[Literal["min", "max", "mean"]] = "mean", + align_across: Optional[str] = None, + # Optional session parameters + messenger: Optional[TransportManager] = None, +): + if not messenger: + raise Exception("Unable to find transport manager") + + # Load feedback queue + machine_config = get_machine_config()[instrument_name] + feedback_queue: str = machine_config.feedback_queue + + # Work out session directory from file path + processed_folder = machine_config.processed_directory_name + if not images: + raise ValueError(f"No image files have been provided for {series_name!r}") + reference_file = images[0] + path_parts = list(reference_file.parts) + path_parts[0] = "" if path_parts[0] == "/" else path_parts[0] + try: + root_index = path_parts.index(processed_folder) + except ValueError: + raise ValueError( + f"The processed directory {processed_folder!r} could not be found in the " + f"file path for {str(reference_file)!r}" + ) + session_dir = Path("/".join(path_parts[:root_index])) + + # Submit message to cryoemservices + messenger.send( + "processing_recipe", + { + "recipes": ["clem-align-and-merge"], + "parameters": { + # Job parameters + "series_name": series_name, + "images": [str(file) for file in images], + "metadata": str(metadata), + "align_self": ("null" if align_self is None else align_self), + "flatten": ("null" if flatten is None else flatten), + "align_across": ("null" if align_across is None else align_across), + # Other recipe parameters + "session_dir": str(session_dir), + "session_id": session_id, + "job_name": series_name, + "feedback_queue": feedback_queue, + }, + }, + new_connection=True, + ) return True From 0067b6d0084804a096c89278f22d18ed7a709079 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 26 Nov 2024 15:46:46 +0000 Subject: [PATCH 03/12] Appended align-and-merge job processing request to file registration functions for LIF and TIFF files --- .../clem/register_preprocessing_results.py | 328 ++++++++++++------ 1 file changed, 225 insertions(+), 103 deletions(-) diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index bb111f23e..9652a951b 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -17,6 +17,7 @@ from sqlalchemy.exc import NoResultFound from sqlmodel import Session, select +from murfey.server import _transport_object from murfey.util.config import get_machine_config from murfey.util.db import ( CLEMImageMetadata, @@ -27,6 +28,7 @@ ) from murfey.util.db import Session as MurfeySession from murfey.util.models import LIFPreprocessingResult, TIFFPreprocessingResult +from murfey.workflows.clem.align_and_merge import submit_cluster_request logger = logging.getLogger("murfey.workflows.clem.register_results") @@ -232,70 +234,130 @@ def register_lif_preprocessing_result( ) return False - # Register items in database if not already present + # Outer try-finally block for tidying up database-related section of function try: - clem_img_stk: CLEMImageStack = get_db_entry( - db=db, - table=CLEMImageStack, - session_id=session_id, - file_path=result.image_stack, - ) + # Register items in database if not already present + try: + clem_img_stk: CLEMImageStack = get_db_entry( + db=db, + table=CLEMImageStack, + session_id=session_id, + file_path=result.image_stack, + ) - clem_img_series: CLEMImageSeries = get_db_entry( - db=db, - table=CLEMImageSeries, - session_id=session_id, - series_name=result.series_name, - ) + clem_img_series: CLEMImageSeries = get_db_entry( + db=db, + table=CLEMImageSeries, + session_id=session_id, + series_name=result.series_name, + ) - clem_metadata: CLEMImageMetadata = get_db_entry( - db=db, - table=CLEMImageMetadata, - session_id=session_id, - file_path=result.metadata, - ) + clem_metadata: CLEMImageMetadata = get_db_entry( + db=db, + table=CLEMImageMetadata, + session_id=session_id, + file_path=result.metadata, + ) - clem_lif_file: CLEMLIFFile = get_db_entry( - db=db, - table=CLEMLIFFile, - session_id=session_id, - file_path=result.parent_lif, - ) + clem_lif_file: CLEMLIFFile = get_db_entry( + db=db, + table=CLEMLIFFile, + session_id=session_id, + file_path=result.parent_lif, + ) - # Link tables to one another and populate fields - clem_img_stk.associated_metadata = clem_metadata - clem_img_stk.parent_lif = clem_lif_file - clem_img_stk.parent_series = clem_img_series - clem_img_stk.channel_name = result.channel - clem_img_stk.stack_created = True - db.add(clem_img_stk) - db.commit() - db.refresh(clem_img_stk) + # Link tables to one another and populate fields + clem_img_stk.associated_metadata = clem_metadata + clem_img_stk.parent_lif = clem_lif_file + clem_img_stk.parent_series = clem_img_series + clem_img_stk.channel_name = result.channel + clem_img_stk.stack_created = True + db.add(clem_img_stk) + db.commit() + db.refresh(clem_img_stk) - clem_img_series.associated_metadata = clem_metadata - clem_img_series.parent_lif = clem_lif_file - clem_img_series.number_of_members = result.number_of_members - db.add(clem_img_series) - db.commit() - db.refresh(clem_img_series) + clem_img_series.associated_metadata = clem_metadata + clem_img_series.parent_lif = clem_lif_file + clem_img_series.number_of_members = result.number_of_members + db.add(clem_img_series) + db.commit() + db.refresh(clem_img_series) - clem_metadata.parent_lif = clem_lif_file - db.add(clem_metadata) - db.commit() - db.refresh(clem_metadata) + clem_metadata.parent_lif = clem_lif_file + db.add(clem_metadata) + db.commit() + db.refresh(clem_metadata) + logger.info( + f"LIF preprocessing results registered for {result.series_name!r} " + f"{result.channel!r} image stack" + ) + + except Exception: + logger.error(traceback.format_exc()) + logger.error( + "Exception encountered when registering LIF preprocessing result for " + f"{result.series_name!r} {result.channel!r} image stack" + ) + return False + + # Load all image stacks associated with current series from database + try: + image_stacks = [ + Path(row) + for row in db.exec( + select(CLEMImageStack.file_path).where( + CLEMImageStack.series_id == clem_img_series.id + ) + ).all() + ] + logger.debug( + f"Found the following images: {[str(file) for file in image_stacks]}" + ) + instrument_name = ( + db.exec(select(MurfeySession).where(MurfeySession.id == session_id)) + .one() + .instrument_name + ) + except Exception: + logger.error(traceback.format_exc()) + logger.error( + f"Error requesting data from database for {result.series_name!r} series" + ) + return False + + # Check if all image stacks for this series are accounted for + if not len(image_stacks) == clem_img_series.number_of_members: + logger.info( + f"Members of the series {result.series_name!r} are still missing; " + "the next stage of processing will not be triggered yet" + ) + return True + + # Request for next stage of processing if all members are present + cluster_response = submit_cluster_request( + session_id=session_id, + instrument_name=instrument_name, + series_name=result.series_name, + images=image_stacks, + metadata=result.metadata, + align_self=None, + flatten="mean", + align_across=None, + messenger=_transport_object, + ) + if cluster_response is False: + logger.error( + "Error requesting align-and-merge processing job for " + f"{result.series_name!r} series" + ) + return False logger.info( - f"LIF preprocessing results registered for {result.series_name!r} {result.channel!r} image stack" + "Successfully requested align-and-merge processing job for " + f"{result.series_name!r} series" ) return True - except Exception: - logger.error(traceback.format_exc()) - logger.error( - f"Exception encountered when registering LIF preprocessing result for {result.series_name!r} {result.channel!r} image stack" - ) - return False - finally: db.close() @@ -330,68 +392,128 @@ def register_tiff_preprocessing_result( ) return False - # Register items in database if not already present + # Outer try-finally block for tidying up database-related section of function try: - clem_img_stk: CLEMImageStack = get_db_entry( - db=db, - table=CLEMImageStack, - session_id=session_id, - file_path=result.image_stack, - ) - clem_img_series: CLEMImageSeries = get_db_entry( - db=db, - table=CLEMImageSeries, - session_id=session_id, - series_name=result.series_name, - ) - clem_metadata: CLEMImageMetadata = get_db_entry( - db=db, - table=CLEMImageMetadata, - session_id=session_id, - file_path=result.metadata, - ) - - # Link tables to one another and populate fields - # Register TIFF files and populate them iteratively first - for file in result.parent_tiffs: - clem_tiff_file: CLEMTIFFFile = get_db_entry( + # Register items in database if not already present + try: + clem_img_stk: CLEMImageStack = get_db_entry( + db=db, + table=CLEMImageStack, + session_id=session_id, + file_path=result.image_stack, + ) + clem_img_series: CLEMImageSeries = get_db_entry( + db=db, + table=CLEMImageSeries, + session_id=session_id, + series_name=result.series_name, + ) + clem_metadata: CLEMImageMetadata = get_db_entry( db=db, - table=CLEMTIFFFile, + table=CLEMImageMetadata, session_id=session_id, - file_path=file, + file_path=result.metadata, ) - clem_tiff_file.associated_metadata = clem_metadata - clem_tiff_file.child_series = clem_img_series - clem_tiff_file.child_stack = clem_img_stk - db.add(clem_tiff_file) + + # Link tables to one another and populate fields + # Register TIFF files and populate them iteratively first + for file in result.parent_tiffs: + clem_tiff_file: CLEMTIFFFile = get_db_entry( + db=db, + table=CLEMTIFFFile, + session_id=session_id, + file_path=file, + ) + clem_tiff_file.associated_metadata = clem_metadata + clem_tiff_file.child_series = clem_img_series + clem_tiff_file.child_stack = clem_img_stk + db.add(clem_tiff_file) + db.commit() + db.refresh(clem_tiff_file) + + clem_img_stk.associated_metadata = clem_metadata + clem_img_stk.parent_series = clem_img_series + clem_img_stk.channel_name = result.channel + clem_img_stk.stack_created = True + db.add(clem_img_stk) db.commit() - db.refresh(clem_tiff_file) + db.refresh(clem_img_stk) - clem_img_stk.associated_metadata = clem_metadata - clem_img_stk.parent_series = clem_img_series - clem_img_stk.channel_name = result.channel - clem_img_stk.stack_created = True - db.add(clem_img_stk) - db.commit() - db.refresh(clem_img_stk) + clem_img_series.associated_metadata = clem_metadata + clem_img_series.number_of_members = result.number_of_members + db.add(clem_img_series) + db.commit() + db.refresh(clem_img_series) - clem_img_series.associated_metadata = clem_metadata - clem_img_series.number_of_members = result.number_of_members - db.add(clem_img_series) - db.commit() - db.refresh(clem_img_series) + logger.info( + f"TIFF preprocessing results registered for {result.series_name!r} " + f"{result.channel!r} image stack" + ) + except Exception: + logger.error(traceback.format_exc()) + logger.error( + "Exception encountered when registering TIFF preprocessing result for " + f"{result.series_name!r} {result.channel!r} image stack" + ) + return False + + # Load all image stacks associated with current series from database + try: + image_stacks = [ + Path(row) + for row in db.exec( + select(CLEMImageStack.file_path).where( + CLEMImageStack.series_id == clem_img_series.id + ) + ).all() + ] + logger.debug( + f"Found the following images: {[str(file) for file in image_stacks]}" + ) + instrument_name = ( + db.exec(select(MurfeySession).where(MurfeySession.id == session_id)) + .one() + .instrument_name + ) + except Exception: + logger.error(traceback.format_exc()) + logger.error( + f"Error requesting data from database for {result.series_name!r} series" + ) + return False + + # Check if all image stacks for this series are accounted for + if not len(image_stacks) == clem_img_series.number_of_members: + logger.info( + f"Members of the series {result.series_name!r} are still missing; " + "the next stage of processing will not be triggered yet" + ) + return True + + # Request for next stage of processing if all members are present + cluster_response = submit_cluster_request( + session_id=session_id, + instrument_name=instrument_name, + series_name=result.series_name, + images=image_stacks, + metadata=result.metadata, + align_self=None, + flatten="mean", + align_across=None, + messenger=_transport_object, + ) + if cluster_response is False: + logger.error( + "Error requesting align-and-merge processing job for " + f"{result.series_name!r} series" + ) + return False logger.info( - f"TIFF preprocessing results registered for {result.series_name!r} {result.channel!r} image stack" + "Successfully requested align-and-merge processing job for " + f"{result.series_name!r} series" ) return True - except Exception: - logger.error(traceback.format_exc()) - logger.error( - f"Exception encountered when registering TIFF preprocessing result for {result.series_name!r} {result.channel!r} image stack" - ) - return False - finally: db.close() From 6ec4f38af584f76b5f394528928c07aee40b7a6a Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 26 Nov 2024 15:59:05 +0000 Subject: [PATCH 04/12] Moved LIF and TIFF registration Pydantic models to the files where they're used --- src/murfey/util/models.py | 37 +------------------ .../clem/register_preprocessing_results.py | 37 ++++++++++++++++++- 2 files changed, 37 insertions(+), 37 deletions(-) diff --git a/src/murfey/util/models.py b/src/murfey/util/models.py index 7af87f754..1067e471e 100644 --- a/src/murfey/util/models.py +++ b/src/murfey/util/models.py @@ -1,11 +1,10 @@ from __future__ import annotations -from ast import literal_eval from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional -from pydantic import BaseModel, validator +from pydantic import BaseModel """ General Models @@ -161,40 +160,6 @@ class TIFFSeriesInfo(BaseModel): series_metadata: Path -class LIFPreprocessingResult(BaseModel): - image_stack: Path - metadata: Path - series_name: str - channel: str - number_of_members: int - parent_lif: Path - - -class TIFFPreprocessingResult(BaseModel): - image_stack: Path - metadata: Path - series_name: str - channel: str - number_of_members: int - parent_tiffs: list[Path] - - @validator( - "parent_tiffs", - pre=True, - ) - def parse_stringified_list(cls, value): - if isinstance(value, str): - try: - eval_result = literal_eval(value) - if isinstance(eval_result, list): - parent_tiffs = [Path(p) for p in eval_result] - return parent_tiffs - except (SyntaxError, ValueError): - raise ValueError("Unable to parse input") - # Return value as-is; if it fails, it fails - return value - - """ FIB === diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index 9652a951b..62834e47e 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -11,9 +11,11 @@ import logging import re import traceback +from ast import literal_eval from pathlib import Path from typing import Optional, Type, Union +from pydantic import BaseModel, validator from sqlalchemy.exc import NoResultFound from sqlmodel import Session, select @@ -27,7 +29,6 @@ CLEMTIFFFile, ) from murfey.util.db import Session as MurfeySession -from murfey.util.models import LIFPreprocessingResult, TIFFPreprocessingResult from murfey.workflows.clem.align_and_merge import submit_cluster_request logger = logging.getLogger("murfey.workflows.clem.register_results") @@ -194,6 +195,15 @@ def get_db_entry( return db_entry +class LIFPreprocessingResult(BaseModel): + image_stack: Path + metadata: Path + series_name: str + channel: str + number_of_members: int + parent_lif: Path + + def register_lif_preprocessing_result( message: dict, db: Session, demo: bool = False ) -> bool: @@ -362,6 +372,31 @@ def register_lif_preprocessing_result( db.close() +class TIFFPreprocessingResult(BaseModel): + image_stack: Path + metadata: Path + series_name: str + channel: str + number_of_members: int + parent_tiffs: list[Path] + + @validator( + "parent_tiffs", + pre=True, + ) + def parse_stringified_list(cls, value): + if isinstance(value, str): + try: + eval_result = literal_eval(value) + if isinstance(eval_result, list): + parent_tiffs = [Path(p) for p in eval_result] + return parent_tiffs + except (SyntaxError, ValueError): + raise ValueError("Unable to parse input") + # Return value as-is; if it fails, it fails + return value + + def register_tiff_preprocessing_result( message: dict, db: Session, demo: bool = False ) -> bool: From f483d4b61874036350b803339df9ac49fb1ffc3c Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 26 Nov 2024 16:07:48 +0000 Subject: [PATCH 05/12] Moved CLEM database functions to __init__, since they will be shared with other sections of this workflow --- src/murfey/workflows/clem/__init__.py | 187 ++++++++++++++++++ .../clem/register_preprocessing_results.py | 166 +--------------- 2 files changed, 188 insertions(+), 165 deletions(-) diff --git a/src/murfey/workflows/clem/__init__.py b/src/murfey/workflows/clem/__init__.py index e69de29bb..789fbabbe 100644 --- a/src/murfey/workflows/clem/__init__.py +++ b/src/murfey/workflows/clem/__init__.py @@ -0,0 +1,187 @@ +from __future__ import annotations + +import logging +import re +from pathlib import Path +from typing import Optional, Type, Union + +from sqlalchemy.exc import NoResultFound +from sqlmodel import Session, select + +from murfey.util.config import get_machine_config +from murfey.util.db import ( + CLEMImageMetadata, + CLEMImageSeries, + CLEMImageStack, + CLEMLIFFile, + CLEMTIFFFile, +) +from murfey.util.db import Session as MurfeySession + +logger = logging.getLogger("murfey.workflows.clem") + + +""" +HELPER FUNCTIONS FOR CLEM DATABASE +""" + + +def _validate_and_sanitise( + file: Path, + session_id: int, + db: Session, +) -> Path: + """ + Performs validation and sanitisation on the incoming file paths, ensuring that + no forbidden characters are present and that the the path points only to allowed + sections of the file server. + + Returns the file path as a sanitised string that can be converted into a Path + object again. + + NOTE: Due to the instrument name query, 'db' now needs to be passed as an + explicit variable to this function from within a FastAPI endpoint, as using the + instance that was imported directly won't load it in the correct state. + """ + + valid_file_types = ( + ".lif", + ".tif", + ".tiff", + ".xlif", + ".xml", + ) + + # Resolve symlinks and directory changes to get full file path + full_path = Path(file).resolve() + + # Use machine configuration to validate which file base paths are accepted from + instrument_name = ( + db.exec(select(MurfeySession).where(MurfeySession.id == session_id)) + .one() + .instrument_name + ) + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + rsync_basepath = machine_config.rsync_basepath + try: + base_path = list(rsync_basepath.parents)[-2].as_posix() + except IndexError: + logger.warning(f"Base path {rsync_basepath!r} is too short") + base_path = rsync_basepath.as_posix() + except Exception as e: + raise Exception( + f"Unexpected exception encountered when loading the file base path: {e}" + ) + + # Check that full file path doesn't contain unallowed characters + # Currently allows only: + # - words (alphanumerics and "_"; \w), + # - spaces (\s), + # - periods, + # - dashes, + # - forward slashes ("/") + if bool(re.fullmatch(r"^[\w\s\.\-/]+$", str(full_path))) is False: + raise ValueError(f"Unallowed characters present in {file}") + + # Check that it's not accessing somehwere it's not allowed + if not str(full_path).startswith(str(base_path)): + raise ValueError(f"{file} points to a directory that is not permitted") + + # Check that it's a file, not a directory + if full_path.is_file() is False: + raise ValueError(f"{file} is not a file") + + # Check that it is of a permitted file type + if f"{full_path.suffix}" not in valid_file_types: + raise ValueError(f"{full_path.suffix} is not a permitted file format") + + return full_path + + +def get_db_entry( + db: Session, + # With the database search funcion having been moved out of the FastAPI + # endpoint, the database now has to be explicitly passed within the FastAPI + # endpoint function in order for it to be loaded in the correct state. + table: Type[ + Union[ + CLEMImageMetadata, + CLEMImageSeries, + CLEMImageStack, + CLEMLIFFile, + CLEMTIFFFile, + ] + ], + session_id: int, + file_path: Optional[Path] = None, + series_name: Optional[str] = None, +) -> Union[ + CLEMImageMetadata, + CLEMImageSeries, + CLEMImageStack, + CLEMLIFFile, + CLEMTIFFFile, +]: + """ + Searches the CLEM workflow-related tables in the Murfey database for an entry that + matches the file path or series name within a given session. Returns the entry if + a match is found, otherwise register it as a new entry in the database. + """ + + # Validate that parameters are provided correctly + if file_path is None and series_name is None: + raise ValueError( + "One of either 'file_path' or 'series_name' has to be provided" + ) + if file_path is not None and series_name is not None: + raise ValueError("Only one of 'file_path' or 'series_name' should be provided") + + # Validate file path if provided + if file_path is not None: + try: + file_path = _validate_and_sanitise(file_path, session_id, db) + except Exception: + raise Exception + + # Validate series name to use + if series_name is not None: + if bool(re.fullmatch(r"^[\w\s\.\-/]+$", series_name)) is False: + raise ValueError("One or more characters in the string are not permitted") + + # Return database entry if it exists + try: + db_entry = ( + db.exec( + select(table) + .where(table.session_id == session_id) + .where(table.file_path == str(file_path)) + ).one() + if file_path is not None + else db.exec( + select(table) + .where(table.session_id == session_id) + .where(table.series_name == series_name) + ).one() + ) + # Create and register new entry if not present + except NoResultFound: + db_entry = ( + table( + file_path=str(file_path), + session_id=session_id, + ) + if file_path is not None + else table( + series_name=series_name, + session_id=session_id, + ) + ) + db.add(db_entry) + db.commit() + db.refresh(db_entry) + except Exception: + raise Exception + + return db_entry diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index 62834e47e..c2d8ce700 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -9,18 +9,14 @@ import json import logging -import re import traceback from ast import literal_eval from pathlib import Path -from typing import Optional, Type, Union from pydantic import BaseModel, validator -from sqlalchemy.exc import NoResultFound from sqlmodel import Session, select from murfey.server import _transport_object -from murfey.util.config import get_machine_config from murfey.util.db import ( CLEMImageMetadata, CLEMImageSeries, @@ -29,172 +25,12 @@ CLEMTIFFFile, ) from murfey.util.db import Session as MurfeySession +from murfey.workflows.clem import get_db_entry from murfey.workflows.clem.align_and_merge import submit_cluster_request logger = logging.getLogger("murfey.workflows.clem.register_results") -def _validate_and_sanitise( - file: Path, - session_id: int, - db: Session, -) -> Path: - """ - Performs validation and sanitisation on the incoming file paths, ensuring that - no forbidden characters are present and that the the path points only to allowed - sections of the file server. - - Returns the file path as a sanitised string that can be converted into a Path - object again. - - NOTE: Due to the instrument name query, 'db' now needs to be passed as an - explicit variable to this function from within a FastAPI endpoint, as using the - instance that was imported directly won't load it in the correct state. - """ - - valid_file_types = ( - ".lif", - ".tif", - ".tiff", - ".xlif", - ".xml", - ) - - # Resolve symlinks and directory changes to get full file path - full_path = Path(file).resolve() - - # Use machine configuration to validate which file base paths are accepted from - instrument_name = ( - db.exec(select(MurfeySession).where(MurfeySession.id == session_id)) - .one() - .instrument_name - ) - machine_config = get_machine_config(instrument_name=instrument_name)[ - instrument_name - ] - rsync_basepath = machine_config.rsync_basepath - try: - base_path = list(rsync_basepath.parents)[-2].as_posix() - except IndexError: - logger.warning(f"Base path {rsync_basepath!r} is too short") - base_path = rsync_basepath.as_posix() - except Exception as e: - raise Exception( - f"Unexpected exception encountered when loading the file base path: {e}" - ) - - # Check that full file path doesn't contain unallowed characters - # Currently allows only: - # - words (alphanumerics and "_"; \w), - # - spaces (\s), - # - periods, - # - dashes, - # - forward slashes ("/") - if bool(re.fullmatch(r"^[\w\s\.\-/]+$", str(full_path))) is False: - raise ValueError(f"Unallowed characters present in {file}") - - # Check that it's not accessing somehwere it's not allowed - if not str(full_path).startswith(str(base_path)): - raise ValueError(f"{file} points to a directory that is not permitted") - - # Check that it's a file, not a directory - if full_path.is_file() is False: - raise ValueError(f"{file} is not a file") - - # Check that it is of a permitted file type - if f"{full_path.suffix}" not in valid_file_types: - raise ValueError(f"{full_path.suffix} is not a permitted file format") - - return full_path - - -def get_db_entry( - db: Session, - # With the database search funcion having been moved out of the FastAPI - # endpoint, the database now has to be explicitly passed within the FastAPI - # endpoint function in order for it to be loaded in the correct state. - table: Type[ - Union[ - CLEMImageMetadata, - CLEMImageSeries, - CLEMImageStack, - CLEMLIFFile, - CLEMTIFFFile, - ] - ], - session_id: int, - file_path: Optional[Path] = None, - series_name: Optional[str] = None, -) -> Union[ - CLEMImageMetadata, - CLEMImageSeries, - CLEMImageStack, - CLEMLIFFile, - CLEMTIFFFile, -]: - """ - Searches the CLEM workflow-related tables in the Murfey database for an entry that - matches the file path or series name within a given session. Returns the entry if - a match is found, otherwise register it as a new entry in the database. - """ - - # Validate that parameters are provided correctly - if file_path is None and series_name is None: - raise ValueError( - "One of either 'file_path' or 'series_name' has to be provided" - ) - if file_path is not None and series_name is not None: - raise ValueError("Only one of 'file_path' or 'series_name' should be provided") - - # Validate file path if provided - if file_path is not None: - try: - file_path = _validate_and_sanitise(file_path, session_id, db) - except Exception: - raise Exception - - # Validate series name to use - if series_name is not None: - if bool(re.fullmatch(r"^[\w\s\.\-/]+$", series_name)) is False: - raise ValueError("One or more characters in the string are not permitted") - - # Return database entry if it exists - try: - db_entry = ( - db.exec( - select(table) - .where(table.session_id == session_id) - .where(table.file_path == str(file_path)) - ).one() - if file_path is not None - else db.exec( - select(table) - .where(table.session_id == session_id) - .where(table.series_name == series_name) - ).one() - ) - # Create and register new entry if not present - except NoResultFound: - db_entry = ( - table( - file_path=str(file_path), - session_id=session_id, - ) - if file_path is not None - else table( - series_name=series_name, - session_id=session_id, - ) - ) - db.add(db_entry) - db.commit() - db.refresh(db_entry) - except Exception: - raise Exception - - return db_entry - - class LIFPreprocessingResult(BaseModel): image_stack: Path metadata: Path From 33df43ff9729f6528d9449319f97d73f9b72dc3e Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 26 Nov 2024 16:22:36 +0000 Subject: [PATCH 06/12] Corrected logger name --- src/murfey/workflows/clem/register_preprocessing_results.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index c2d8ce700..35d4b7d9e 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -28,7 +28,7 @@ from murfey.workflows.clem import get_db_entry from murfey.workflows.clem.align_and_merge import submit_cluster_request -logger = logging.getLogger("murfey.workflows.clem.register_results") +logger = logging.getLogger("murfey.workflows.clem.register_preprocessing_results") class LIFPreprocessingResult(BaseModel): From 9f0cff1e69ef47fff6b135e443c94ff1f573fd25 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 26 Nov 2024 16:36:47 +0000 Subject: [PATCH 07/12] Added entry point to register CLEM align-and-merge results --- pyproject.toml | 1 + .../clem/register_align_and_merge_results.py | 22 +++++++++++++++++++ 2 files changed, 23 insertions(+) create mode 100644 src/murfey/workflows/clem/register_align_and_merge_results.py diff --git a/pyproject.toml b/pyproject.toml index f92d3eb01..e15397403 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -99,6 +99,7 @@ murfey = "murfey.client:run" "murfey_machine" = "murfey.util.config:get_extended_machine_config" [project.entry-points."murfey.workflows"] "clem.align_and_merge" = "murfey.workflows.clem.align_and_merge:submit_cluster_request" +"clem.register_align_and_merge_result" = "murfey.workflows.clem.register_align_and_merge_results:register_align_and_merge_result" "process_raw_lifs" = "murfey.workflows.clem.process_raw_lifs:zocalo_cluster_request" "process_raw_tiffs" = "murfey.workflows.clem.process_raw_tiffs:zocalo_cluster_request" "register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result" diff --git a/src/murfey/workflows/clem/register_align_and_merge_results.py b/src/murfey/workflows/clem/register_align_and_merge_results.py new file mode 100644 index 000000000..0e269c8f5 --- /dev/null +++ b/src/murfey/workflows/clem/register_align_and_merge_results.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Optional + +from pydantic import BaseModel + +logger = logging.getLogger("murfey.workflows.clem.register_align_and_merge_results") + + +class AlignAndMergeResult(BaseModel): + series_name: str + image_stacks: list[Path] + align_self: Optional[str] = None + flatten: Optional[str] = "mean" + align_across: Optional[str] = None + composite_image: Path + + +def register_align_and_merge_result(): + return True From 6e9c67d041d684a2731adbb993e0703fa7baa332 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 26 Nov 2024 16:39:38 +0000 Subject: [PATCH 08/12] Added 'clem.' to the start of entry point names for other CLEM workflows --- pyproject.toml | 8 ++++---- src/murfey/server/api/clem.py | 8 ++++++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index e15397403..06c6b96cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -99,11 +99,11 @@ murfey = "murfey.client:run" "murfey_machine" = "murfey.util.config:get_extended_machine_config" [project.entry-points."murfey.workflows"] "clem.align_and_merge" = "murfey.workflows.clem.align_and_merge:submit_cluster_request" +"clem.process_raw_lifs" = "murfey.workflows.clem.process_raw_lifs:zocalo_cluster_request" +"clem.process_raw_tiffs" = "murfey.workflows.clem.process_raw_tiffs:zocalo_cluster_request" "clem.register_align_and_merge_result" = "murfey.workflows.clem.register_align_and_merge_results:register_align_and_merge_result" -"process_raw_lifs" = "murfey.workflows.clem.process_raw_lifs:zocalo_cluster_request" -"process_raw_tiffs" = "murfey.workflows.clem.process_raw_tiffs:zocalo_cluster_request" -"register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result" -"register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result" +"clem.register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result" +"clem.register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result" [tool.setuptools] package-dir = {"" = "src"} diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 0f948c36f..17cdd6f1d 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -629,7 +629,9 @@ def process_raw_lifs( try: # Try and load relevant Murfey workflow workflow: EntryPoint = list( - entry_points().select(group="murfey.workflows", name="process_raw_lifs") + entry_points().select( + group="murfey.workflows", name="clem.process_raw_lifs" + ) )[0] except IndexError: raise RuntimeError("The relevant Murfey workflow was not found") @@ -661,7 +663,9 @@ def process_raw_tiffs( try: # Try and load relevant Murfey workflow workflow: EntryPoint = list( - entry_points().select(group="murfey.workflows", name="process_raw_tiffs") + entry_points().select( + group="murfey.workflows", name="clem.process_raw_tiffs" + ) )[0] except IndexError: raise RuntimeError("The relevant Murfey workflow was not found") From a309a6eb9d202e386ab52d1a5e65b1968446c005 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 26 Nov 2024 17:51:43 +0000 Subject: [PATCH 09/12] Added comments about planned future work to CLEM tables --- src/murfey/util/db.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index e9bcb926e..9a7584f28 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -269,6 +269,10 @@ class CLEMImageSeries(SQLModel, table=True): # type: ignore ) # One to many # Process checklist for series + # FIXME: Columns need to be updated now that workflow is clearer + # - Image alignment to happen in two stages: + # 1. Align to self, then + # 2. Align to a reference stack number_of_members: int = ( 0 # Expected number of image stacks belonging to this series ) @@ -323,6 +327,11 @@ class CLEMImageStack(SQLModel, table=True): # type: ignore ) # Process checklist for each image + # FIXME: Columns need to be updated now that workflow is clearer + # - Image registration to happen in two stages: + # 1. Align to itself, then + # 2. Align to a reference image + # - Individual RGB images don't need to be created; columns can be removed stack_created: bool = False # Verify that the stack has been created image_aligned: bool = False # Verify that image alignment has been done on stack aligned_image: Optional[str] = None # Full path to aligned image stack From f9dc8137d1acda7bc1f4baf4a25a24934abe340d Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 26 Nov 2024 17:52:23 +0000 Subject: [PATCH 10/12] Added entry point to register align-and-merge processing results --- .../clem/register_align_and_merge_results.py | 104 +++++++++++++++++- 1 file changed, 101 insertions(+), 3 deletions(-) diff --git a/src/murfey/workflows/clem/register_align_and_merge_results.py b/src/murfey/workflows/clem/register_align_and_merge_results.py index 0e269c8f5..40d5b3946 100644 --- a/src/murfey/workflows/clem/register_align_and_merge_results.py +++ b/src/murfey/workflows/clem/register_align_and_merge_results.py @@ -1,10 +1,17 @@ from __future__ import annotations +import json import logging +import traceback +from ast import literal_eval from pathlib import Path from typing import Optional -from pydantic import BaseModel +from pydantic import BaseModel, validator +from sqlmodel import Session + +from murfey.util.db import CLEMImageSeries +from murfey.workflows.clem import get_db_entry logger = logging.getLogger("murfey.workflows.clem.register_align_and_merge_results") @@ -17,6 +24,97 @@ class AlignAndMergeResult(BaseModel): align_across: Optional[str] = None composite_image: Path + @validator( + "image_stacks", + pre=True, + ) + def parse_stringified_list(cls, value): + if isinstance(value, str): + try: + eval_result = literal_eval(value) + if isinstance(eval_result, list): + parent_tiffs = [Path(p) for p in eval_result] + return parent_tiffs + except (SyntaxError, ValueError): + raise ValueError("Unable to parse input") + # Return value as-is; if it fails, it fails + return value + + +def register_align_and_merge_result( + message: dict, db: Session, demo: bool = False +) -> bool: + """ + session_id (recipe) + register (wrapper) + result (wrapper) + key1 + key2 + ... + """ + + session_id: int = ( + int(message["session_id"]) + if not isinstance(message["session_id"], int) + else message["session_id"] + ) + + # Validate message and try and load results + if isinstance(message["result"], str): + try: + json_obj: dict = json.loads(message["result"]) + result = AlignAndMergeResult(**json_obj) + except Exception: + logger.error(traceback.format_exc()) + logger.error( + "Exception encountered when parsing align-and-merge processing result" + ) + return False + elif isinstance(message["result"], dict): + try: + result = AlignAndMergeResult(**message["result"]) + except Exception: + logger.error(traceback.format_exc()) + logger.error( + "Exception encountered when parsing align-and-merge processing result" + ) + return False + else: + logger.error( + "Invalid type for align-and-merge processing result: " + f"{type(message['result'])}" + ) + return False + + # Outer try-finally block for tidying up database-related section of function + try: + # Register items in database if not already present + try: + clem_img_series: CLEMImageSeries = get_db_entry( + db=db, + table=CLEMImageSeries, + session_id=session_id, + series_name=result.series_name, + ) + clem_img_series.composite_image = str(result.composite_image) + clem_img_series.composite_created = True + db.add(clem_img_series) + db.commit() + db.refresh(clem_img_series) + + logger.info( + "Align-and-merge processing result registered for " + f"{result.series_name!r} series" + ) + + except Exception: + logger.error(traceback.format_exc()) + logger.error( + "Exception encountered when registering LIF preprocessing result for " + f"{result.series_name!r} {result.channel!r} image stack" + ) + return False -def register_align_and_merge_result(): - return True + return True + finally: + db.close() From 35316153f9e6d73a8a6cb0e1fcc3b21a845106f7 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 29 Nov 2024 15:29:37 +0000 Subject: [PATCH 11/12] Removed #FIXME comments --- src/murfey/util/db.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index 9a7584f28..e9bcb926e 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -269,10 +269,6 @@ class CLEMImageSeries(SQLModel, table=True): # type: ignore ) # One to many # Process checklist for series - # FIXME: Columns need to be updated now that workflow is clearer - # - Image alignment to happen in two stages: - # 1. Align to self, then - # 2. Align to a reference stack number_of_members: int = ( 0 # Expected number of image stacks belonging to this series ) @@ -327,11 +323,6 @@ class CLEMImageStack(SQLModel, table=True): # type: ignore ) # Process checklist for each image - # FIXME: Columns need to be updated now that workflow is clearer - # - Image registration to happen in two stages: - # 1. Align to itself, then - # 2. Align to a reference image - # - Individual RGB images don't need to be created; columns can be removed stack_created: bool = False # Verify that the stack has been created image_aligned: bool = False # Verify that image alignment has been done on stack aligned_image: Optional[str] = None # Full path to aligned image stack From 38ae39ec94c224bdd7f1f1b23523940c0f2ec8af Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 29 Nov 2024 15:34:59 +0000 Subject: [PATCH 12/12] No need to convert 'None' to 'null' when sending message to trigger processing --- src/murfey/workflows/clem/align_and_merge.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/murfey/workflows/clem/align_and_merge.py b/src/murfey/workflows/clem/align_and_merge.py index dbbc44c2c..5c02e6e0f 100644 --- a/src/murfey/workflows/clem/align_and_merge.py +++ b/src/murfey/workflows/clem/align_and_merge.py @@ -64,9 +64,9 @@ def submit_cluster_request( "series_name": series_name, "images": [str(file) for file in images], "metadata": str(metadata), - "align_self": ("null" if align_self is None else align_self), - "flatten": ("null" if flatten is None else flatten), - "align_across": ("null" if align_across is None else align_across), + "align_self": align_self, + "flatten": flatten, + "align_across": align_across, # Other recipe parameters "session_dir": str(session_dir), "session_id": session_id,