diff --git a/pyproject.toml b/pyproject.toml index 5f8295125..06c6b96cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -98,10 +98,12 @@ murfey = "murfey.client:run" [project.entry-points."murfey.config.extraction"] "murfey_machine" = "murfey.util.config:get_extended_machine_config" [project.entry-points."murfey.workflows"] -"process_raw_lifs" = "murfey.workflows.clem.process_raw_lifs:zocalo_cluster_request" -"process_raw_tiffs" = "murfey.workflows.clem.process_raw_tiffs:zocalo_cluster_request" -"register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result" -"register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result" +"clem.align_and_merge" = "murfey.workflows.clem.align_and_merge:submit_cluster_request" +"clem.process_raw_lifs" = "murfey.workflows.clem.process_raw_lifs:zocalo_cluster_request" +"clem.process_raw_tiffs" = "murfey.workflows.clem.process_raw_tiffs:zocalo_cluster_request" +"clem.register_align_and_merge_result" = "murfey.workflows.clem.register_align_and_merge_results:register_align_and_merge_result" +"clem.register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result" +"clem.register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result" [tool.setuptools] package-dir = {"" = "src"} diff --git a/src/murfey/server/api/clem.py b/src/murfey/server/api/clem.py index 0f948c36f..17cdd6f1d 100644 --- a/src/murfey/server/api/clem.py +++ b/src/murfey/server/api/clem.py @@ -629,7 +629,9 @@ def process_raw_lifs( try: # Try and load relevant Murfey workflow workflow: EntryPoint = list( - entry_points().select(group="murfey.workflows", name="process_raw_lifs") + entry_points().select( + group="murfey.workflows", name="clem.process_raw_lifs" + ) )[0] except IndexError: raise RuntimeError("The relevant Murfey workflow was not found") @@ -661,7 +663,9 @@ def process_raw_tiffs( try: # Try and load relevant Murfey workflow workflow: EntryPoint = list( - entry_points().select(group="murfey.workflows", name="process_raw_tiffs") + entry_points().select( + group="murfey.workflows", name="clem.process_raw_tiffs" + ) )[0] except IndexError: raise RuntimeError("The relevant Murfey workflow was not found") diff --git a/src/murfey/util/models.py b/src/murfey/util/models.py index 7af87f754..1067e471e 100644 --- a/src/murfey/util/models.py +++ b/src/murfey/util/models.py @@ -1,11 +1,10 @@ from __future__ import annotations -from ast import literal_eval from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional -from pydantic import BaseModel, validator +from pydantic import BaseModel """ General Models @@ -161,40 +160,6 @@ class TIFFSeriesInfo(BaseModel): series_metadata: Path -class LIFPreprocessingResult(BaseModel): - image_stack: Path - metadata: Path - series_name: str - channel: str - number_of_members: int - parent_lif: Path - - -class TIFFPreprocessingResult(BaseModel): - image_stack: Path - metadata: Path - series_name: str - channel: str - number_of_members: int - parent_tiffs: list[Path] - - @validator( - "parent_tiffs", - pre=True, - ) - def parse_stringified_list(cls, value): - if isinstance(value, str): - try: - eval_result = literal_eval(value) - if isinstance(eval_result, list): - parent_tiffs = [Path(p) for p in eval_result] - return parent_tiffs - except (SyntaxError, ValueError): - raise ValueError("Unable to parse input") - # Return value as-is; if it fails, it fails - return value - - """ FIB === diff --git a/src/murfey/workflows/clem/__init__.py b/src/murfey/workflows/clem/__init__.py index e69de29bb..789fbabbe 100644 --- a/src/murfey/workflows/clem/__init__.py +++ b/src/murfey/workflows/clem/__init__.py @@ -0,0 +1,187 @@ +from __future__ import annotations + +import logging +import re +from pathlib import Path +from typing import Optional, Type, Union + +from sqlalchemy.exc import NoResultFound +from sqlmodel import Session, select + +from murfey.util.config import get_machine_config +from murfey.util.db import ( + CLEMImageMetadata, + CLEMImageSeries, + CLEMImageStack, + CLEMLIFFile, + CLEMTIFFFile, +) +from murfey.util.db import Session as MurfeySession + +logger = logging.getLogger("murfey.workflows.clem") + + +""" +HELPER FUNCTIONS FOR CLEM DATABASE +""" + + +def _validate_and_sanitise( + file: Path, + session_id: int, + db: Session, +) -> Path: + """ + Performs validation and sanitisation on the incoming file paths, ensuring that + no forbidden characters are present and that the the path points only to allowed + sections of the file server. + + Returns the file path as a sanitised string that can be converted into a Path + object again. + + NOTE: Due to the instrument name query, 'db' now needs to be passed as an + explicit variable to this function from within a FastAPI endpoint, as using the + instance that was imported directly won't load it in the correct state. + """ + + valid_file_types = ( + ".lif", + ".tif", + ".tiff", + ".xlif", + ".xml", + ) + + # Resolve symlinks and directory changes to get full file path + full_path = Path(file).resolve() + + # Use machine configuration to validate which file base paths are accepted from + instrument_name = ( + db.exec(select(MurfeySession).where(MurfeySession.id == session_id)) + .one() + .instrument_name + ) + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + rsync_basepath = machine_config.rsync_basepath + try: + base_path = list(rsync_basepath.parents)[-2].as_posix() + except IndexError: + logger.warning(f"Base path {rsync_basepath!r} is too short") + base_path = rsync_basepath.as_posix() + except Exception as e: + raise Exception( + f"Unexpected exception encountered when loading the file base path: {e}" + ) + + # Check that full file path doesn't contain unallowed characters + # Currently allows only: + # - words (alphanumerics and "_"; \w), + # - spaces (\s), + # - periods, + # - dashes, + # - forward slashes ("/") + if bool(re.fullmatch(r"^[\w\s\.\-/]+$", str(full_path))) is False: + raise ValueError(f"Unallowed characters present in {file}") + + # Check that it's not accessing somehwere it's not allowed + if not str(full_path).startswith(str(base_path)): + raise ValueError(f"{file} points to a directory that is not permitted") + + # Check that it's a file, not a directory + if full_path.is_file() is False: + raise ValueError(f"{file} is not a file") + + # Check that it is of a permitted file type + if f"{full_path.suffix}" not in valid_file_types: + raise ValueError(f"{full_path.suffix} is not a permitted file format") + + return full_path + + +def get_db_entry( + db: Session, + # With the database search funcion having been moved out of the FastAPI + # endpoint, the database now has to be explicitly passed within the FastAPI + # endpoint function in order for it to be loaded in the correct state. + table: Type[ + Union[ + CLEMImageMetadata, + CLEMImageSeries, + CLEMImageStack, + CLEMLIFFile, + CLEMTIFFFile, + ] + ], + session_id: int, + file_path: Optional[Path] = None, + series_name: Optional[str] = None, +) -> Union[ + CLEMImageMetadata, + CLEMImageSeries, + CLEMImageStack, + CLEMLIFFile, + CLEMTIFFFile, +]: + """ + Searches the CLEM workflow-related tables in the Murfey database for an entry that + matches the file path or series name within a given session. Returns the entry if + a match is found, otherwise register it as a new entry in the database. + """ + + # Validate that parameters are provided correctly + if file_path is None and series_name is None: + raise ValueError( + "One of either 'file_path' or 'series_name' has to be provided" + ) + if file_path is not None and series_name is not None: + raise ValueError("Only one of 'file_path' or 'series_name' should be provided") + + # Validate file path if provided + if file_path is not None: + try: + file_path = _validate_and_sanitise(file_path, session_id, db) + except Exception: + raise Exception + + # Validate series name to use + if series_name is not None: + if bool(re.fullmatch(r"^[\w\s\.\-/]+$", series_name)) is False: + raise ValueError("One or more characters in the string are not permitted") + + # Return database entry if it exists + try: + db_entry = ( + db.exec( + select(table) + .where(table.session_id == session_id) + .where(table.file_path == str(file_path)) + ).one() + if file_path is not None + else db.exec( + select(table) + .where(table.session_id == session_id) + .where(table.series_name == series_name) + ).one() + ) + # Create and register new entry if not present + except NoResultFound: + db_entry = ( + table( + file_path=str(file_path), + session_id=session_id, + ) + if file_path is not None + else table( + series_name=series_name, + session_id=session_id, + ) + ) + db.add(db_entry) + db.commit() + db.refresh(db_entry) + except Exception: + raise Exception + + return db_entry diff --git a/src/murfey/workflows/clem/align_and_merge.py b/src/murfey/workflows/clem/align_and_merge.py new file mode 100644 index 000000000..5c02e6e0f --- /dev/null +++ b/src/murfey/workflows/clem/align_and_merge.py @@ -0,0 +1,79 @@ +""" +Script to allow Murfey to request for an image alignment, colorisation, and merge job +from cryoemservices. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Literal, Optional + +from murfey.util.config import get_machine_config + +try: + from murfey.server.ispyb import TransportManager # Session +except AttributeError: + pass # Ignore if ISPyB credentials environment variable not set + + +def submit_cluster_request( + # Session parameters + session_id: int, + instrument_name: str, + # Processing parameters + series_name: str, + images: list[Path], + metadata: Path, + # Optional processing parameters + align_self: Optional[str] = None, + flatten: Optional[Literal["min", "max", "mean"]] = "mean", + align_across: Optional[str] = None, + # Optional session parameters + messenger: Optional[TransportManager] = None, +): + if not messenger: + raise Exception("Unable to find transport manager") + + # Load feedback queue + machine_config = get_machine_config()[instrument_name] + feedback_queue: str = machine_config.feedback_queue + + # Work out session directory from file path + processed_folder = machine_config.processed_directory_name + if not images: + raise ValueError(f"No image files have been provided for {series_name!r}") + reference_file = images[0] + path_parts = list(reference_file.parts) + path_parts[0] = "" if path_parts[0] == "/" else path_parts[0] + try: + root_index = path_parts.index(processed_folder) + except ValueError: + raise ValueError( + f"The processed directory {processed_folder!r} could not be found in the " + f"file path for {str(reference_file)!r}" + ) + session_dir = Path("/".join(path_parts[:root_index])) + + # Submit message to cryoemservices + messenger.send( + "processing_recipe", + { + "recipes": ["clem-align-and-merge"], + "parameters": { + # Job parameters + "series_name": series_name, + "images": [str(file) for file in images], + "metadata": str(metadata), + "align_self": align_self, + "flatten": flatten, + "align_across": align_across, + # Other recipe parameters + "session_dir": str(session_dir), + "session_id": session_id, + "job_name": series_name, + "feedback_queue": feedback_queue, + }, + }, + new_connection=True, + ) + return True diff --git a/src/murfey/workflows/clem/register_align_and_merge_results.py b/src/murfey/workflows/clem/register_align_and_merge_results.py new file mode 100644 index 000000000..40d5b3946 --- /dev/null +++ b/src/murfey/workflows/clem/register_align_and_merge_results.py @@ -0,0 +1,120 @@ +from __future__ import annotations + +import json +import logging +import traceback +from ast import literal_eval +from pathlib import Path +from typing import Optional + +from pydantic import BaseModel, validator +from sqlmodel import Session + +from murfey.util.db import CLEMImageSeries +from murfey.workflows.clem import get_db_entry + +logger = logging.getLogger("murfey.workflows.clem.register_align_and_merge_results") + + +class AlignAndMergeResult(BaseModel): + series_name: str + image_stacks: list[Path] + align_self: Optional[str] = None + flatten: Optional[str] = "mean" + align_across: Optional[str] = None + composite_image: Path + + @validator( + "image_stacks", + pre=True, + ) + def parse_stringified_list(cls, value): + if isinstance(value, str): + try: + eval_result = literal_eval(value) + if isinstance(eval_result, list): + parent_tiffs = [Path(p) for p in eval_result] + return parent_tiffs + except (SyntaxError, ValueError): + raise ValueError("Unable to parse input") + # Return value as-is; if it fails, it fails + return value + + +def register_align_and_merge_result( + message: dict, db: Session, demo: bool = False +) -> bool: + """ + session_id (recipe) + register (wrapper) + result (wrapper) + key1 + key2 + ... + """ + + session_id: int = ( + int(message["session_id"]) + if not isinstance(message["session_id"], int) + else message["session_id"] + ) + + # Validate message and try and load results + if isinstance(message["result"], str): + try: + json_obj: dict = json.loads(message["result"]) + result = AlignAndMergeResult(**json_obj) + except Exception: + logger.error(traceback.format_exc()) + logger.error( + "Exception encountered when parsing align-and-merge processing result" + ) + return False + elif isinstance(message["result"], dict): + try: + result = AlignAndMergeResult(**message["result"]) + except Exception: + logger.error(traceback.format_exc()) + logger.error( + "Exception encountered when parsing align-and-merge processing result" + ) + return False + else: + logger.error( + "Invalid type for align-and-merge processing result: " + f"{type(message['result'])}" + ) + return False + + # Outer try-finally block for tidying up database-related section of function + try: + # Register items in database if not already present + try: + clem_img_series: CLEMImageSeries = get_db_entry( + db=db, + table=CLEMImageSeries, + session_id=session_id, + series_name=result.series_name, + ) + clem_img_series.composite_image = str(result.composite_image) + clem_img_series.composite_created = True + db.add(clem_img_series) + db.commit() + db.refresh(clem_img_series) + + logger.info( + "Align-and-merge processing result registered for " + f"{result.series_name!r} series" + ) + + except Exception: + logger.error(traceback.format_exc()) + logger.error( + "Exception encountered when registering LIF preprocessing result for " + f"{result.series_name!r} {result.channel!r} image stack" + ) + return False + + return True + finally: + db.close() diff --git a/src/murfey/workflows/clem/register_preprocessing_results.py b/src/murfey/workflows/clem/register_preprocessing_results.py index bb111f23e..35d4b7d9e 100644 --- a/src/murfey/workflows/clem/register_preprocessing_results.py +++ b/src/murfey/workflows/clem/register_preprocessing_results.py @@ -9,15 +9,14 @@ import json import logging -import re import traceback +from ast import literal_eval from pathlib import Path -from typing import Optional, Type, Union -from sqlalchemy.exc import NoResultFound +from pydantic import BaseModel, validator from sqlmodel import Session, select -from murfey.util.config import get_machine_config +from murfey.server import _transport_object from murfey.util.db import ( CLEMImageMetadata, CLEMImageSeries, @@ -26,170 +25,19 @@ CLEMTIFFFile, ) from murfey.util.db import Session as MurfeySession -from murfey.util.models import LIFPreprocessingResult, TIFFPreprocessingResult +from murfey.workflows.clem import get_db_entry +from murfey.workflows.clem.align_and_merge import submit_cluster_request -logger = logging.getLogger("murfey.workflows.clem.register_results") +logger = logging.getLogger("murfey.workflows.clem.register_preprocessing_results") -def _validate_and_sanitise( - file: Path, - session_id: int, - db: Session, -) -> Path: - """ - Performs validation and sanitisation on the incoming file paths, ensuring that - no forbidden characters are present and that the the path points only to allowed - sections of the file server. - - Returns the file path as a sanitised string that can be converted into a Path - object again. - - NOTE: Due to the instrument name query, 'db' now needs to be passed as an - explicit variable to this function from within a FastAPI endpoint, as using the - instance that was imported directly won't load it in the correct state. - """ - - valid_file_types = ( - ".lif", - ".tif", - ".tiff", - ".xlif", - ".xml", - ) - - # Resolve symlinks and directory changes to get full file path - full_path = Path(file).resolve() - - # Use machine configuration to validate which file base paths are accepted from - instrument_name = ( - db.exec(select(MurfeySession).where(MurfeySession.id == session_id)) - .one() - .instrument_name - ) - machine_config = get_machine_config(instrument_name=instrument_name)[ - instrument_name - ] - rsync_basepath = machine_config.rsync_basepath - try: - base_path = list(rsync_basepath.parents)[-2].as_posix() - except IndexError: - logger.warning(f"Base path {rsync_basepath!r} is too short") - base_path = rsync_basepath.as_posix() - except Exception as e: - raise Exception( - f"Unexpected exception encountered when loading the file base path: {e}" - ) - - # Check that full file path doesn't contain unallowed characters - # Currently allows only: - # - words (alphanumerics and "_"; \w), - # - spaces (\s), - # - periods, - # - dashes, - # - forward slashes ("/") - if bool(re.fullmatch(r"^[\w\s\.\-/]+$", str(full_path))) is False: - raise ValueError(f"Unallowed characters present in {file}") - - # Check that it's not accessing somehwere it's not allowed - if not str(full_path).startswith(str(base_path)): - raise ValueError(f"{file} points to a directory that is not permitted") - - # Check that it's a file, not a directory - if full_path.is_file() is False: - raise ValueError(f"{file} is not a file") - - # Check that it is of a permitted file type - if f"{full_path.suffix}" not in valid_file_types: - raise ValueError(f"{full_path.suffix} is not a permitted file format") - - return full_path - - -def get_db_entry( - db: Session, - # With the database search funcion having been moved out of the FastAPI - # endpoint, the database now has to be explicitly passed within the FastAPI - # endpoint function in order for it to be loaded in the correct state. - table: Type[ - Union[ - CLEMImageMetadata, - CLEMImageSeries, - CLEMImageStack, - CLEMLIFFile, - CLEMTIFFFile, - ] - ], - session_id: int, - file_path: Optional[Path] = None, - series_name: Optional[str] = None, -) -> Union[ - CLEMImageMetadata, - CLEMImageSeries, - CLEMImageStack, - CLEMLIFFile, - CLEMTIFFFile, -]: - """ - Searches the CLEM workflow-related tables in the Murfey database for an entry that - matches the file path or series name within a given session. Returns the entry if - a match is found, otherwise register it as a new entry in the database. - """ - - # Validate that parameters are provided correctly - if file_path is None and series_name is None: - raise ValueError( - "One of either 'file_path' or 'series_name' has to be provided" - ) - if file_path is not None and series_name is not None: - raise ValueError("Only one of 'file_path' or 'series_name' should be provided") - - # Validate file path if provided - if file_path is not None: - try: - file_path = _validate_and_sanitise(file_path, session_id, db) - except Exception: - raise Exception - - # Validate series name to use - if series_name is not None: - if bool(re.fullmatch(r"^[\w\s\.\-/]+$", series_name)) is False: - raise ValueError("One or more characters in the string are not permitted") - - # Return database entry if it exists - try: - db_entry = ( - db.exec( - select(table) - .where(table.session_id == session_id) - .where(table.file_path == str(file_path)) - ).one() - if file_path is not None - else db.exec( - select(table) - .where(table.session_id == session_id) - .where(table.series_name == series_name) - ).one() - ) - # Create and register new entry if not present - except NoResultFound: - db_entry = ( - table( - file_path=str(file_path), - session_id=session_id, - ) - if file_path is not None - else table( - series_name=series_name, - session_id=session_id, - ) - ) - db.add(db_entry) - db.commit() - db.refresh(db_entry) - except Exception: - raise Exception - - return db_entry +class LIFPreprocessingResult(BaseModel): + image_stack: Path + metadata: Path + series_name: str + channel: str + number_of_members: int + parent_lif: Path def register_lif_preprocessing_result( @@ -232,74 +80,159 @@ def register_lif_preprocessing_result( ) return False - # Register items in database if not already present + # Outer try-finally block for tidying up database-related section of function try: - clem_img_stk: CLEMImageStack = get_db_entry( - db=db, - table=CLEMImageStack, - session_id=session_id, - file_path=result.image_stack, - ) + # Register items in database if not already present + try: + clem_img_stk: CLEMImageStack = get_db_entry( + db=db, + table=CLEMImageStack, + session_id=session_id, + file_path=result.image_stack, + ) - clem_img_series: CLEMImageSeries = get_db_entry( - db=db, - table=CLEMImageSeries, - session_id=session_id, - series_name=result.series_name, - ) + clem_img_series: CLEMImageSeries = get_db_entry( + db=db, + table=CLEMImageSeries, + session_id=session_id, + series_name=result.series_name, + ) - clem_metadata: CLEMImageMetadata = get_db_entry( - db=db, - table=CLEMImageMetadata, - session_id=session_id, - file_path=result.metadata, - ) + clem_metadata: CLEMImageMetadata = get_db_entry( + db=db, + table=CLEMImageMetadata, + session_id=session_id, + file_path=result.metadata, + ) - clem_lif_file: CLEMLIFFile = get_db_entry( - db=db, - table=CLEMLIFFile, - session_id=session_id, - file_path=result.parent_lif, - ) + clem_lif_file: CLEMLIFFile = get_db_entry( + db=db, + table=CLEMLIFFile, + session_id=session_id, + file_path=result.parent_lif, + ) + + # Link tables to one another and populate fields + clem_img_stk.associated_metadata = clem_metadata + clem_img_stk.parent_lif = clem_lif_file + clem_img_stk.parent_series = clem_img_series + clem_img_stk.channel_name = result.channel + clem_img_stk.stack_created = True + db.add(clem_img_stk) + db.commit() + db.refresh(clem_img_stk) + + clem_img_series.associated_metadata = clem_metadata + clem_img_series.parent_lif = clem_lif_file + clem_img_series.number_of_members = result.number_of_members + db.add(clem_img_series) + db.commit() + db.refresh(clem_img_series) - # Link tables to one another and populate fields - clem_img_stk.associated_metadata = clem_metadata - clem_img_stk.parent_lif = clem_lif_file - clem_img_stk.parent_series = clem_img_series - clem_img_stk.channel_name = result.channel - clem_img_stk.stack_created = True - db.add(clem_img_stk) - db.commit() - db.refresh(clem_img_stk) - - clem_img_series.associated_metadata = clem_metadata - clem_img_series.parent_lif = clem_lif_file - clem_img_series.number_of_members = result.number_of_members - db.add(clem_img_series) - db.commit() - db.refresh(clem_img_series) - - clem_metadata.parent_lif = clem_lif_file - db.add(clem_metadata) - db.commit() - db.refresh(clem_metadata) + clem_metadata.parent_lif = clem_lif_file + db.add(clem_metadata) + db.commit() + db.refresh(clem_metadata) + + logger.info( + f"LIF preprocessing results registered for {result.series_name!r} " + f"{result.channel!r} image stack" + ) + + except Exception: + logger.error(traceback.format_exc()) + logger.error( + "Exception encountered when registering LIF preprocessing result for " + f"{result.series_name!r} {result.channel!r} image stack" + ) + return False + # Load all image stacks associated with current series from database + try: + image_stacks = [ + Path(row) + for row in db.exec( + select(CLEMImageStack.file_path).where( + CLEMImageStack.series_id == clem_img_series.id + ) + ).all() + ] + logger.debug( + f"Found the following images: {[str(file) for file in image_stacks]}" + ) + instrument_name = ( + db.exec(select(MurfeySession).where(MurfeySession.id == session_id)) + .one() + .instrument_name + ) + except Exception: + logger.error(traceback.format_exc()) + logger.error( + f"Error requesting data from database for {result.series_name!r} series" + ) + return False + + # Check if all image stacks for this series are accounted for + if not len(image_stacks) == clem_img_series.number_of_members: + logger.info( + f"Members of the series {result.series_name!r} are still missing; " + "the next stage of processing will not be triggered yet" + ) + return True + + # Request for next stage of processing if all members are present + cluster_response = submit_cluster_request( + session_id=session_id, + instrument_name=instrument_name, + series_name=result.series_name, + images=image_stacks, + metadata=result.metadata, + align_self=None, + flatten="mean", + align_across=None, + messenger=_transport_object, + ) + if cluster_response is False: + logger.error( + "Error requesting align-and-merge processing job for " + f"{result.series_name!r} series" + ) + return False logger.info( - f"LIF preprocessing results registered for {result.series_name!r} {result.channel!r} image stack" + "Successfully requested align-and-merge processing job for " + f"{result.series_name!r} series" ) return True - except Exception: - logger.error(traceback.format_exc()) - logger.error( - f"Exception encountered when registering LIF preprocessing result for {result.series_name!r} {result.channel!r} image stack" - ) - return False - finally: db.close() +class TIFFPreprocessingResult(BaseModel): + image_stack: Path + metadata: Path + series_name: str + channel: str + number_of_members: int + parent_tiffs: list[Path] + + @validator( + "parent_tiffs", + pre=True, + ) + def parse_stringified_list(cls, value): + if isinstance(value, str): + try: + eval_result = literal_eval(value) + if isinstance(eval_result, list): + parent_tiffs = [Path(p) for p in eval_result] + return parent_tiffs + except (SyntaxError, ValueError): + raise ValueError("Unable to parse input") + # Return value as-is; if it fails, it fails + return value + + def register_tiff_preprocessing_result( message: dict, db: Session, demo: bool = False ) -> bool: @@ -330,68 +263,128 @@ def register_tiff_preprocessing_result( ) return False - # Register items in database if not already present + # Outer try-finally block for tidying up database-related section of function try: - clem_img_stk: CLEMImageStack = get_db_entry( - db=db, - table=CLEMImageStack, - session_id=session_id, - file_path=result.image_stack, - ) - clem_img_series: CLEMImageSeries = get_db_entry( - db=db, - table=CLEMImageSeries, - session_id=session_id, - series_name=result.series_name, - ) - clem_metadata: CLEMImageMetadata = get_db_entry( - db=db, - table=CLEMImageMetadata, - session_id=session_id, - file_path=result.metadata, - ) - - # Link tables to one another and populate fields - # Register TIFF files and populate them iteratively first - for file in result.parent_tiffs: - clem_tiff_file: CLEMTIFFFile = get_db_entry( + # Register items in database if not already present + try: + clem_img_stk: CLEMImageStack = get_db_entry( db=db, - table=CLEMTIFFFile, + table=CLEMImageStack, session_id=session_id, - file_path=file, + file_path=result.image_stack, ) - clem_tiff_file.associated_metadata = clem_metadata - clem_tiff_file.child_series = clem_img_series - clem_tiff_file.child_stack = clem_img_stk - db.add(clem_tiff_file) + clem_img_series: CLEMImageSeries = get_db_entry( + db=db, + table=CLEMImageSeries, + session_id=session_id, + series_name=result.series_name, + ) + clem_metadata: CLEMImageMetadata = get_db_entry( + db=db, + table=CLEMImageMetadata, + session_id=session_id, + file_path=result.metadata, + ) + + # Link tables to one another and populate fields + # Register TIFF files and populate them iteratively first + for file in result.parent_tiffs: + clem_tiff_file: CLEMTIFFFile = get_db_entry( + db=db, + table=CLEMTIFFFile, + session_id=session_id, + file_path=file, + ) + clem_tiff_file.associated_metadata = clem_metadata + clem_tiff_file.child_series = clem_img_series + clem_tiff_file.child_stack = clem_img_stk + db.add(clem_tiff_file) + db.commit() + db.refresh(clem_tiff_file) + + clem_img_stk.associated_metadata = clem_metadata + clem_img_stk.parent_series = clem_img_series + clem_img_stk.channel_name = result.channel + clem_img_stk.stack_created = True + db.add(clem_img_stk) db.commit() - db.refresh(clem_tiff_file) - - clem_img_stk.associated_metadata = clem_metadata - clem_img_stk.parent_series = clem_img_series - clem_img_stk.channel_name = result.channel - clem_img_stk.stack_created = True - db.add(clem_img_stk) - db.commit() - db.refresh(clem_img_stk) - - clem_img_series.associated_metadata = clem_metadata - clem_img_series.number_of_members = result.number_of_members - db.add(clem_img_series) - db.commit() - db.refresh(clem_img_series) + db.refresh(clem_img_stk) + + clem_img_series.associated_metadata = clem_metadata + clem_img_series.number_of_members = result.number_of_members + db.add(clem_img_series) + db.commit() + db.refresh(clem_img_series) + + logger.info( + f"TIFF preprocessing results registered for {result.series_name!r} " + f"{result.channel!r} image stack" + ) + except Exception: + logger.error(traceback.format_exc()) + logger.error( + "Exception encountered when registering TIFF preprocessing result for " + f"{result.series_name!r} {result.channel!r} image stack" + ) + return False + + # Load all image stacks associated with current series from database + try: + image_stacks = [ + Path(row) + for row in db.exec( + select(CLEMImageStack.file_path).where( + CLEMImageStack.series_id == clem_img_series.id + ) + ).all() + ] + logger.debug( + f"Found the following images: {[str(file) for file in image_stacks]}" + ) + instrument_name = ( + db.exec(select(MurfeySession).where(MurfeySession.id == session_id)) + .one() + .instrument_name + ) + except Exception: + logger.error(traceback.format_exc()) + logger.error( + f"Error requesting data from database for {result.series_name!r} series" + ) + return False + + # Check if all image stacks for this series are accounted for + if not len(image_stacks) == clem_img_series.number_of_members: + logger.info( + f"Members of the series {result.series_name!r} are still missing; " + "the next stage of processing will not be triggered yet" + ) + return True + + # Request for next stage of processing if all members are present + cluster_response = submit_cluster_request( + session_id=session_id, + instrument_name=instrument_name, + series_name=result.series_name, + images=image_stacks, + metadata=result.metadata, + align_self=None, + flatten="mean", + align_across=None, + messenger=_transport_object, + ) + if cluster_response is False: + logger.error( + "Error requesting align-and-merge processing job for " + f"{result.series_name!r} series" + ) + return False logger.info( - f"TIFF preprocessing results registered for {result.series_name!r} {result.channel!r} image stack" + "Successfully requested align-and-merge processing job for " + f"{result.series_name!r} series" ) return True - except Exception: - logger.error(traceback.format_exc()) - logger.error( - f"Exception encountered when registering TIFF preprocessing result for {result.series_name!r} {result.channel!r} image stack" - ) - return False - finally: db.close()